76

golang-nsq系列(一)--初识

 4 years ago
source link: https://www.tuicool.com/articles/6NJNbuJ
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

nsq 最初是由 bitly 公司开源出来的一款简单易用的分布式消息中间件,它可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息。

1460000020110041?w=256&h=256

它具有以下特性:

分布式。它提供了分布式的、去中心化且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和高可用特性。

易于扩展。它支持水平扩展,没有中心化的消息代理( Broker ),内置的发现服务让集群中增加节点非常容易。

运维方便。它非常容易配置和部署,灵活性高。

高度集成。现在已经有官方的 GolangPythonJavaScript 客户端,社区也有了其他各个语言的客户端库方便接入,自定义客户端也非常容易。

1. 首先到官方文档看用法:

https://nsq.io/overview/quick...

1460000020110042

下载对应的二进制可执行文件,在本地按照上述步骤就可以跑起来了,看下 nsqadmin 后台展示如下:

1460000020110043?w=1080&h=511

2. docker 环境搭建 nsq

参考官方提供资料创建:docker-compose.yml

version: '2' # 高版本支持3
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160:4160" # tcp
      - "4161:4161" # http

  nsqd:
    image: nsqio/nsq
    # 广播地址不填的话默认就是oshostname(或虚拟机名称),那样 lookupd 会连接不上,所以直接写IP
    command: /nsqd --broadcast-address=10.236.92.208 --lookupd-tcp-address=nsqlookupd:4160
    depends_on:
      - nsqlookupd
    ports:
      - "4150:4150" # tcp
      - "4151:4151" # http

  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    depends_on:
      - nsqlookupd
    ports:
      - "4171:4171" # http

执行 docker-compose up -d 生成对应的三个容器:

nsqgo_nsqd_1

nsqgo_nsqlookupd_1

nsqgo_nsqadmin_1

3. Golang 使用 nsq

创建生产者:producer.go

package main

import (
  "fmt"
  "log"
  "time"

  "github.com/nsqio/go-nsq"
)

func main() {
  config := nsq.NewConfig()
  p, err := nsq.NewProducer("127.0.0.1:4150", config)

  if err != nil {
    log.Panic(err)
  }

  for i := 0; i < 1000; i++ {
    msg := fmt.Sprintf("num-%d", i)
    log.Println("Pub:" + msg)
    err = p.Publish("testTopic", []byte(msg))
    if err != nil {
      log.Panic(err)
    }
    time.Sleep(time.Second * 1)
  }

  p.Stop()
}

循环写 1000 个 num-1--1000,通过 p.Publish 发送到消息队列中,等待消费。

创建消费者:consumer.go

package main

import (
  "log"
  "sync"

  "github.com/nsqio/go-nsq"
)

func main() {
  wg := &sync.WaitGroup{}
  wg.Add(1000)

  config := nsq.NewConfig()
  c, _ := nsq.NewConsumer("testTopic", "ch", config)
  c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
    log.Printf("Got a message: %s", message.Body)
    wg.Done()
    return nil
  }))

  // 1.直连nsqd
  // err := c.ConnectToNSQD("127.0.0.1:4150")

  // 2.通过 nsqlookupd 服务发现
  err := c.ConnectToNSQLookupd("127.0.0.1:4161")
  if err != nil {
    log.Panic(err)
  }
  wg.Wait()
}

可通过两种方式与 nsqd 连接:

1). 直连 nsqd ,适用于单机( standalone )版;

2). 通过 nsqlookupd 服务发现,适用于集群( cluster )版;

输出结果:

go run producer.go

2019/08/18 20:29:51 Pub:num-0
2019/08/18 20:29:51 INF    1 (127.0.0.1:4150) connecting to nsqd
2019/08/18 20:29:52 Pub:num-1
2019/08/18 20:29:53 Pub:num-2
2019/08/18 20:29:54 Pub:num-3
2019/08/18 20:29:55 Pub:num-4
2019/08/18 20:29:56 Pub:num-5
2019/08/18 20:29:57 Pub:num-6
2019/08/18 20:29:58 Pub:num-7
2019/08/18 20:29:59 Pub:num-8
2019/08/18 20:30:00 Pub:num-9
2019/08/18 20:30:01 Pub:num-10

go run consumer.go

2019/08/18 20:30:08 INF    1 [testTopic/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testTopic
2019/08/18 20:30:08 INF    1 [testTopic/ch] (10.236.92.208:4150) connecting to nsqd
2019/08/18 20:30:08 Got a message: num-0
2019/08/18 20:30:08 Got a message: num-1
2019/08/18 20:30:08 Got a message: num-2
2019/08/18 20:30:08 Got a message: num-3
2019/08/18 20:30:08 Got a message: num-4
2019/08/18 20:30:08 Got a message: num-5
2019/08/18 20:30:08 Got a message: num-6
2019/08/18 20:30:08 Got a message: num-7
2019/08/18 20:30:08 Got a message: num-8
2019/08/18 20:30:08 Got a message: num-9
2019/08/18 20:30:08 Got a message: num-10

github 源码下载:

https://github.com/astraw99/n...

【小结】

a. 单个nsqd可以有多个topic,每个topic可以有多个channel。channel接收这个topic所有消息的副本,从而实现多播分发,而channel上的每个消息被均匀的分发给它的订阅者,从而实现负载均衡;

b. nsq 专门为分布式、集群化而生,在处理 SPOF(single point of failure, 单点故障)、高可用、最终一致性方面很有优势。

1460000020110044


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK