28

如何在 Golang 中使用 MQTT

 3 years ago
source link: https://studygolang.com/articles/30841
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Golang 是 Google 开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。Go 的表现力强、简洁、干净、高效。它的并发机制使它能够轻松地编写程序,从而最大限度地利用多核和网络机器,而它新颖的类型系统则能够实现灵活和模块化的程序构造。Go 快速编译成机器代码,但又具有垃圾回收的便利性和运行时反射的强大功能。它是一种快速的、静态类型化的、编译后的语言,就像一种动态类型化的、解释的语言。

MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。

本文主要介绍如何在 Golang 项目中使用 paho.mqtt.golang 客户端库 ,实现客户端与 MQTT 服务器 的连接、订阅、收发消息等功能。

项目初始化

本项目基于 go1.13.12 进行开发测试

go version
go version go1.13.12 darwin/amd64

本项目使用 paho.mqtt.golang 作为 MQTT 客户端库,安装:

go get github.com/eclipse/paho.mqtt.golang

Go MQTT 使用

本文将使用 EMQ X 提供的 免费公共 MQTT 服务器 ,该服务基于 EMQ X 的 MQTT 物联网云平台 创建。服务器接入信息如下:

  • Broker: broker.emqx.io
  • TCP Port: 1883
  • Websocket Port: 8083

连接 MQTT 服务器

package main

import (
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "time"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
    fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    fmt.Printf("Connect lost: %v", err)
}

func main() {
    var broker = "broker.emqx.io"
    var port = 1883
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
    opts.SetClientID("go_mqtt_client")
    opts.SetUsername("emqx")
    opts.SetPassword("public")
    opts.SetDefaultPublishHandler(messagePubHandler)
    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
  }
}
  • ClientOptions:用于设置 broker,端口,客户端 id ,用户名密码等选项
  • messagePubHandler:全局 MQTT pub 消息处理
  • connectHandler:连接的回调
  • connectLostHandler:连接丢失的回调

如果想使用 TLS 连接,可以如下设置:

func NewTlsConfig() *tls.Config {
    certpool := x509.NewCertPool()
    ca, err := ioutil.ReadFile("ca.pem")
    if err != nil {
        log.Fatalln(err.Error())
    }
    certpool.AppendCertsFromPEM(ca)
    // Import client certificate/key pair
    clientKeyPair, err := tls.LoadX509KeyPair("client-crt.pem", "client-key.pem")
    if err != nil {
        panic(err)
    }
    return &tls.Config{
        RootCAs: certpool,
        ClientAuth: tls.NoClientCert,
        ClientCAs: nil,
        InsecureSkipVerify: true,
        Certificates: []tls.Certificate{clientKeyPair},
    }
}

如果不设置客户端证书,可以如下设置:

func NewTlsConfig() *tls.Config {
    certpool := x509.NewCertPool()
    ca, err := ioutil.ReadFile("ca.pem")
    if err != nil {
        log.Fatalln(err.Error())
    }
    certpool.AppendCertsFromPEM(ca)
    return &tls.Config{
        RootCAs: certpool,
}

然后设置 TLS

var broker = "broker.emqx.io"
var port = 8883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
tlsConfig := NewTlsConfig()
opts.SetTLSConfig(tlsConfig)
// other options

订阅

func sub(client mqtt.Client) {
    topic := "topic/test"
    token := client.Subscribe(topic, 1, nil)
    token.Wait()
    fmt.Printf("Subscribed to topic %s", topic)
}

发布消息

func publish(client mqtt.Client) {
    num := 10
    for i := 0; i < num; i++ {
        text := fmt.Sprintf("Message %d", i)
        token := client.Publish("topic/test", 0, false, text)
        token.Wait()
        time.Sleep(time.Second)
    }
}

测试

我们使用以下代码进行测试

package main

import (
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "log"
    "time"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
    fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    fmt.Printf("Connect lost: %v", err)
}

func main() {
    var broker = "broker.emqx.io"
    var port = 1883
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
    opts.SetClientID("go_mqtt_client")
    opts.SetUsername("emqx")
    opts.SetPassword("public")
    opts.SetDefaultPublishHandler(messagePubHandler)
    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    sub(client)
    publish(client)

    client.Disconnect(250)
}

func publish(client mqtt.Client) {
    num := 10
    for i := 0; i < num; i++ {
        text := fmt.Sprintf("Message %d", i)
        token := client.Publish("topic/test", 0, false, text)
        token.Wait()
        time.Sleep(time.Second)
    }
}

func sub(client mqtt.Client) {
    topic := "topic/test"
    token := client.Subscribe(topic, 1, nil)
    token.Wait()
  fmt.Printf("Subscribed to topic: %s", topic)
}

运行代码,可以看到 MQTT 连接、订阅成功,并能成功收到订阅 topic 的消息

zQnUj27.png!mobile

总结

至此,我们完成了使用 paho.mqtt.golang 客户端连接到 公共 MQTT 服务器 ,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅。

接下来我们将会陆续发布更多关于物联网开发及 MQTT 的相关文章,敬请关注。

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接: https://www.emqx.io/cn/blog/how-to-use-mqtt-in-golang

有疑问加站长微信联系

iiUfA3j.png!mobile

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK