42

golang nats queue模式

 5 years ago
source link: https://studygolang.com/articles/14298?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

队列订阅模式

此模式中,订阅者要指定两个属性,主题和队列(queue,其实就是队列名称)

注意:下面所有前提=必须订阅同一个主题

发布消息后,N个具有同样的主题和queue的订阅者,只有一个会收到消息。(random算法)

说明:queue=工作组,工作组中有N个worker,发布消息后,同一个工作组中,仅有一个worker会收到消息。

相同主题,不同queue的订阅者之间,不符合上面的描述。这种情况下,可以把同一个queue的订阅者们,当成一个订阅者来处理,这样就和普通的发布订阅模式一样了。

主题subj1,queue=q1的订阅者有sub1-q1,sub2-q1,sub3-q1

主题subj1,queue=q2的订阅者有sub1-q2,sub2-q2,sub3-q2

一个主题,两组订阅者,每组订阅者中各有3个订阅者。

对sub1发布消息,q1,q2两个组都会收到消息(发布订阅模式),q1,q2每个组中,分别仅有一个订阅者会收到消息(queue模式)

server

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "flag"
)

const (
    //url   = "nats://192.168.3.125:4222"
    url = nats.DefaultURL
)

var (
    nc  *nats.Conn
    err error
)

func init() {
    if nc, err = nats.Connect(url); checkErr(err) {
        //
    }
}

func main() {
    var (
        servername = flag.String("servername", "y", "name for server")
        queueGroup = flag.String("group", "", "group name for Subscribe")
        subj       = flag.String("subj", "", "subject name")
    )
    flag.Parse()

    log.Println(*servername, *queueGroup, *subj)
    startService(*subj, *servername+" worker1", *queueGroup)
    startService(*subj, *servername+" worker2", *queueGroup)
    startService(*subj, *servername+" worker3", *queueGroup)

    select {}
}

//receive message
func startService(subj, name, queue string) {
    go async(nc, subj, name, queue)
}

func async(nc *nats.Conn, subj, name, queue string) {
    nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
        log.Println(name, "Received a message From Async : ", string(msg.Data))
    })
}

func checkErr(err error) bool {
    if err != nil {
        log.Println(err)
        return false
    }
    return true
}

client

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "strconv"
    "github.com/pborman/uuid"
    "flag"
    "time"
)

const (
    //url   = "nats://192.168.3.125:4222"
    url = nats.DefaultURL
)

var (
    nc  *nats.Conn
    err error
)

func init() {
    if nc, err = nats.Connect(url); checkErr(err) {
        //
    }
}

func main() {
    var (
        subj = flag.String("subj", "", "subject name")
    )
    flag.Parse()
    log.Println(*subj)
    startClient(*subj)

    time.Sleep(time.Second)
}

//send message to server
func startClient(subj string) {
    for i := 0; i < 1; i++ {
        id := uuid.New()
        log.Println(id)
        nc.Publish(subj, []byte(id+" Sun "+strconv.Itoa(i)))
        nc.Publish(subj, []byte(id+" Rain "+strconv.Itoa(i)))
        nc.Publish(subj, []byte(id+" Fog "+strconv.Itoa(i)))
        nc.Publish(subj, []byte(id+" Cloudy "+strconv.Itoa(i)))
    }
}

func checkErr(err error) bool {
    if err != nil {
        log.Println(err)
        return false
    }
    return true
}

启动server A queue=g1,订阅主题=weather

./main -servername=A -group=g1 -subj=weather
2018/08/18 11:32:16 A g1 weather

启动server B queue=g1,订阅主题=weather

./main -servername=B -group=g1 -subj=weather
2018/08/18 11:32:21 B g1 weather

发送消息

./main -subj=weather
2018/08/18 11:32:24 weather
2018/08/18 11:32:24 3005ae7c-85ab-42d3-ad09-d44688d129ad

结果 server A收到消息

2018/08/18 11:32:24 A worker3 Received a message From Async :  3005ae7c-85ab-42d3-ad09-d44688d129ad Rain 0
2018/08/18 11:32:24 A worker2 Received a message From Async :  3005ae7c-85ab-42d3-ad09-d44688d129ad Sun 0

结果 server B收到消息

2018/08/18 11:32:24 B worker3 Received a message From Async :  3005ae7c-85ab-42d3-ad09-d44688d129ad Fog 0
2018/08/18 11:32:24 B worker3 Received a message From Async :  3005ae7c-85ab-42d3-ad09-d44688d129ad Cloudy 0

主题相同,queue不同

启动server c queue=test,订阅主题=weather

> ./main -servername=C -group=test -subj=weather
2018/08/18 11:37:43 C test weather

发消息

./main -subj=weather
2018/08/18 11:37:47 weather
2018/08/18 11:37:47 b4e201dd-ea4a-4ec3-aa45-99489695f0c2

Server c 收到了全部消息

2018/08/18 11:37:47 C worker1 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Fog 0
2018/08/18 11:37:47 C worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Sun 0
2018/08/18 11:37:47 C worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Rain 0
2018/08/18 11:37:47 C worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Cloudy 0

Server A 收到3条消息

2018/08/18 11:37:47 A worker1 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Rain 0
2018/08/18 11:37:47 A worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Sun 0
2018/08/18 11:37:47 A worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Cloudy 0

Server B 收到1条消息

2018/08/18 11:37:47 B worker2 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Fog 0

总结:queue模式,在分发消息时,进行负载均衡,随机发送给同一组中的任意一个订阅者,可以随时增加删除订阅者,配合响应的监控数据和统计数据,对下游的业务进行自动伸缩。

提高系统的可用性,避免业务在单点处理导致系统瓶颈。

栗子:

比如用户登录,对login主题发送消息,积分系统订阅了login主题,收到login的消息后,对用户的积分进行处理。为了保证积分处理的高可用,可以使用相同的queue=score,启动多个积分处理服务。

监控积分业务的处理时间,如果某个积分处理服务,业务执行时间过长(比如由于某些/某类用户的特殊情况,积分算法不同等),造成了消息积压,不能及时处理。

在积分系统的下游仍有处理能力的时候(比如依赖下游的某个接口,此接口的处理能力依然是正常的),可以自动启动多个积分处理服务,订阅主题login,queue=score,分散计算压力。

如果是下游的处理能力受限,则可能要进行限流处理,不但不能启动多个积分处理服务,还要限制积分业务的处理速度。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK