1

【.NET+MQTT】.NET6 环境下实现MQTT通信,以及服务端、客户端的双边消息订阅与发布的...

 1 year ago
source link: https://www.cnblogs.com/weskynet/p/16441219.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

【.NET+MQTT】.NET6 环境下实现MQTT通信,以及服务端、客户端的双边消息订阅与发布的代码演示

前言: MQTT广泛应用于工业物联网、智能家居、各类智能制造或各类自动化场景等。MQTT是一个基于客户端-服务器的消息发布/订阅传输协议,在很多受限的环境下,比如说机器与机器通信、机器与物联网通信等。好了,科普的废话不多说,下面直接通过.NET环境来实现一套MQTT通信demo,实现服务端与客户端的双边消息发布与订阅的功能和演示。

开发环境:

VS2022 + .NET 6 + Webapi / 控制台

1、新建一个webapi项目,用来后面做测试使用

1995789-20220703194134355-483218839.png

2、新建一个继承自IHostedService的服务,用于随着webapi程序的启动而自动执行。(最终代码在文末)

1995789-20220703194205010-1299395058.png

3、引入 MQTTNet 包,该项目提供了.net环境下的MQTT通信协议支持,这款框架很优秀,此处直接引用它来进行使用。

1995789-20220703194345070-1810279689.png

4、在上面的MqttHostService类里面,开始方法里面新增初始化MQTT服务端的一些功能,例如 IP、端口号、事件等等。

1995789-20220703194636641-1567439202.png

5、mqtt服务端支持的一系列功能很多,大佬们可以自行去尝试一些新发现,此处只使用若干个简单功能。

1995789-20220703194830275-502253778.png

6、添加客户端连接事件、连接关闭事件

1995789-20220703194928106-1460692685.png

7、由于事件要用的可能有点多,此处就不一一例举了,可以直接看以下的代码,以及有关注释来理解。

1995789-20220703195009822-97634286.png

8、事件触发时候,打印输出

1995789-20220703195107565-214206639.png

9、输出之前,记录一个当前事件名称标记一下,用于可以更加清楚看出是哪个事件输出的。

1995789-20220703215235523-678532482.png

10、对MqttHostService类进行注册,用于程序启动时候跟随启动。

1995789-20220703215345229-1564643322.png

11、上面貌似设计的不是很友好,所以把mqtt服务实例单独弄出来,写入到单独的类里面做成属性,供方便调用。

1995789-20220703215441638-987500302.png

12、把先前的一些东西改一下,换成使用上面步骤的属性来直接调用使用。

1995789-20220703215547355-2106860901.png

13、运行一下,看看是否可以成功,显示服务已启动,说明服务启动时OK的了.

1995789-20220703215702808-1751344784.png

14、新增一个控制台程序 MqttClient,用于模拟客户端。

1995789-20220703215756942-1232256291.png

15、创建客户端启动以及有关配置信息和有关事件,如图。具体使用可以看代码注释,就不过多解释了。

1995789-20220703215840102-114291994.png

16、在program类里面,调用客户端启动方法,用于测试使用。

1995789-20220703215941108-1286692271.png

17、上面客户端对应的三个事件的实现如图,同时进行有关信息的打印输出。

1995789-20220703220050242-1487085793.png

18、启动服务端,然后启动客户端,可以看到服务端有一个连接失败的消息,这个是因为上面配置的客户端用户名是admin,密码是1234567,而服务端配置的规则是,用户名是admin  密码是123456

1995789-20220703220148788-635156660.png

19、密码改回正常匹配项以后,再重新运行试试看,可以看到客户端与服务端连接上了。

1995789-20220703220343280-644593861.png

20、如果关闭客户端,也可以看到服务端会进入客户端关闭事件内。

1995789-20220703220439491-842049005.png

21、把上面主题订阅的内容写到连接成功以后的事件里面,不然客户端连接期间,可能就执行了主题订阅,会存在订阅失败的情况。改为写入到连接成功以后的事件里面,可以保证主题订阅肯定是在客户端连接成功以后才执行的。

1995789-20220703220523956-1567473251.png

22、接下来测试服务端消息推送,在MqttService服务里面,新增一个方法,用来执行mqtt服务端发布主题消息使用。有关配置信息和消息格式,如图所示。

1995789-20220703220719847-1172207829.png

23、新增一个API控制器,用来测试使用。API参数直接拿来进行消息的推送使用。

1995789-20220703220840811-199730676.png

24、运行服务端和客户端,并访问刚刚新增的api接口,手动随意输入一条消息,可以看到客户端订阅的主题消息已经被实时接收到了。

1995789-20220703220958165-278871284.png

25、接下来对客户端新增一个消息推送的方法,用来测试客户端消息发布的功能。有关消息格式和调用,如图所示,以及注释部分的说明。

1995789-20220703221108865-807389405.png

26、客户端program类里面,客户端连接以后,通过手动回车,来执行客户端发布消息。

1995789-20220703221255436-685914373.png

27、再次启动服务端和客户端

1995789-20220703221349692-703611602.png

28、然后客户端内按一下回车,执行消息发布功能。可以看到,服务端成功接收到了客户端发过来的主题消息。

1995789-20220703221428986-1395486033.png

29、接下来测试客户端与客户端之间的消息发布与订阅,为了模拟多客户端效果,把上面客户端已经编译好的文件拷贝一份出来。

1995789-20220703221534107-1276617126.png

30、然后本地的代码进行一些修改,用来当做第二个客户端程序。所以客户端id也进行变更为 testclient02

1995789-20220703221658862-1953759631.png

31、对客户端订阅的主题,也改成 topic_02

1995789-20220703221807134-1328871422.png

32、启动服务端,以及拷贝出来的客户端1,和上面修改了部分代码的客户端2,保证都已经连接上服务端。

1995789-20220703221837670-178570046.png

33、调用服务端的api接口,由于服务端发布的消息是发布给topic_01的,所以只有客户端1可以接收到消息。

1995789-20220703221930141-1503226518.png

34、客户端1执行回车,用于发布一段消息给主题 topic_02,可以看到客户端01发布的消息,同时被服务端和客户端02接收到了。因为服务端是总指挥,所以客户端发布的消息都会经过服务端,从而服务端都可以接收到连接的客户端发布的所有消息。

1995789-20220703222118765-1358166854.png

35、测试数据保持,下面先对客户端1进行断开,然后再重新连接客户端1,可以看到客户端1直接接收到了它订阅的主题的上一次最新的消息内容,这个就是消息里面,Retain属性设为True的结果,用于让服务端记忆该主题消息使用的。如果设为false,就没有这个效果了,大佬们也可以自己尝试。

1995789-20220703222414564-2056736783.png

36、最终的服务端代码:

MqttHostService:

  public class MqttHostService : IHostedService, IDisposable
    {
        public void Dispose()
        {
            
        }
        const string ServerClientId = "SERVER";
        public Task StartAsync(CancellationToken cancellationToken)
        {
            MqttServerOptionsBuilder optionsBuilder = new MqttServerOptionsBuilder();
            optionsBuilder.WithDefaultEndpoint();
            optionsBuilder.WithDefaultEndpointPort(10086); // 设置 服务端 端口号
            optionsBuilder.WithConnectionBacklog(1000); // 最大连接数
            MqttServerOptions options = optionsBuilder.Build();

            MqttService._mqttServer = new MqttFactory().CreateMqttServer(options);

            MqttService._mqttServer.ClientConnectedAsync += _mqttServer_ClientConnectedAsync; //客户端连接事件
            MqttService._mqttServer.ClientDisconnectedAsync += _mqttServer_ClientDisconnectedAsync; // 客户端关闭事件
            MqttService._mqttServer.ApplicationMessageNotConsumedAsync += _mqttServer_ApplicationMessageNotConsumedAsync; // 消息接收事件

            MqttService._mqttServer.ClientSubscribedTopicAsync += _mqttServer_ClientSubscribedTopicAsync; // 客户端订阅主题事件
            MqttService._mqttServer.ClientUnsubscribedTopicAsync += _mqttServer_ClientUnsubscribedTopicAsync; // 客户端取消订阅事件
            MqttService._mqttServer.StartedAsync += _mqttServer_StartedAsync; // 启动后事件
            MqttService._mqttServer.StoppedAsync += _mqttServer_StoppedAsync; // 关闭后事件
            MqttService._mqttServer.InterceptingPublishAsync += _mqttServer_InterceptingPublishAsync; // 消息接收事件
            MqttService._mqttServer.ValidatingConnectionAsync += _mqttServer_ValidatingConnectionAsync; // 用户名和密码验证有关

            MqttService._mqttServer.StartAsync();
            return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端订阅主题事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
        {
            Console.WriteLine($"ClientSubscribedTopicAsync:客户端ID=【{arg.ClientId}】订阅的主题=【{arg.TopicFilter}】 ");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 关闭后事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_StoppedAsync(EventArgs arg)
        {
            Console.WriteLine($"StoppedAsync:MQTT服务已关闭……");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 用户名和密码验证有关
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
        {
            arg.ReasonCode = MqttConnectReasonCode.Success;
            if ((arg.Username ?? string.Empty)!="admin" || (arg.Password??String.Empty)!="123456")
            {
                arg.ReasonCode = MqttConnectReasonCode.Banned;
                Console.WriteLine($"ValidatingConnectionAsync:客户端ID=【{arg.ClientId}】用户名或密码验证错误 ");

            }
            return Task.CompletedTask;
        }

        /// <summary>
        /// 消息接收事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
        {
            if (string.Equals(arg.ClientId, ServerClientId))
            {
                return Task.CompletedTask;
            }

            Console.WriteLine($"InterceptingPublishAsync:客户端ID=【{arg.ClientId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
            return Task.CompletedTask;

        }

        /// <summary>
        /// 启动后事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_StartedAsync(EventArgs arg)
        {
            Console.WriteLine($"StartedAsync:MQTT服务已启动……");
           return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端取消订阅事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
        {
            Console.WriteLine($"ClientUnsubscribedTopicAsync:客户端ID=【{arg.ClientId}】已取消订阅的主题=【{arg.TopicFilter}】  ");
            return Task.CompletedTask;
        }

        private Task _mqttServer_ApplicationMessageNotConsumedAsync(ApplicationMessageNotConsumedEventArgs arg)
        {
            Console.WriteLine($"ApplicationMessageNotConsumedAsync:发送端ID=【{arg.SenderId}】 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
            return Task.CompletedTask;

        }

        /// <summary>
        /// 客户端断开时候触发
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        /// <exception cref="NotImplementedException"></exception>
        private Task _mqttServer_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
        {
            Console.WriteLine($"ClientDisconnectedAsync:客户端ID=【{arg.ClientId}】已断开, 地址=【{arg.Endpoint}】  ");
            return Task.CompletedTask;

        }

        /// <summary>
        /// 客户端连接时候触发
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttServer_ClientConnectedAsync(ClientConnectedEventArgs arg)
        {
            Console.WriteLine($"ClientConnectedAsync:客户端ID=【{arg.ClientId}】已连接, 用户名=【{arg.UserName}】地址=【{arg.Endpoint}】  ");
            return Task.CompletedTask;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
           return Task.CompletedTask;
        }
    }

MqttService:

 public class MqttService
    {
        public static MqttServer _mqttServer { get; set; }

        public static void PublishData(string data)
        {
            var message = new MqttApplicationMessage
            {
                Topic = "topic_01",
                Payload = Encoding.Default.GetBytes(data),
                QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
                Retain = true  // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
            };

            _mqttServer.InjectApplicationMessage(new InjectedMqttApplicationMessage(message) // 发送消息给有订阅 topic_01的客户端
            {
                SenderClientId = "Server_01"
            }).GetAwaiter().GetResult();
        }

    }

37、最终的客户端代码:

MqttClientService:

public class MqttClientService
    {
        public static IMqttClient _mqttClient;
        public void MqttClientStart()
        {
            var optionsBuilder = new MqttClientOptionsBuilder()
                .WithTcpServer("127.0.0.1", 10086) // 要访问的mqtt服务端的 ip 和 端口号
                .WithCredentials("admin", "123456") // 要访问的mqtt服务端的用户名和密码
                .WithClientId("testclient02") // 设置客户端id
                .WithCleanSession()
                .WithTls(new MqttClientOptionsBuilderTlsParameters
                {
                    UseTls = false  // 是否使用 tls加密
                });

            var clientOptions = optionsBuilder.Build();
            _mqttClient = new MqttFactory().CreateMqttClient();

            _mqttClient.ConnectedAsync += _mqttClient_ConnectedAsync; // 客户端连接成功事件
            _mqttClient.DisconnectedAsync += _mqttClient_DisconnectedAsync; // 客户端连接关闭事件
            _mqttClient.ApplicationMessageReceivedAsync += _mqttClient_ApplicationMessageReceivedAsync; // 收到消息事件

            _mqttClient.ConnectAsync(clientOptions);


        }

        /// <summary>
        /// 客户端连接关闭事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
        {
            Console.WriteLine($"客户端已断开与服务端的连接……");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 客户端连接成功事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttClient_ConnectedAsync(MqttClientConnectedEventArgs arg)
        {
            Console.WriteLine($"客户端已连接服务端……");

            // 订阅消息主题
            // MqttQualityOfServiceLevel: (QoS):  0 最多一次,接收者不确认收到消息,并且消息不被发送者存储和重新发送提供与底层 TCP 协议相同的保证。
            // 1: 保证一条消息至少有一次会传递给接收方。发送方存储消息,直到它从接收方收到确认收到消息的数据包。一条消息可以多次发送或传递。
            // 2: 保证每条消息仅由预期的收件人接收一次。级别2是最安全和最慢的服务质量级别,保证由发送方和接收方之间的至少两个请求/响应(四次握手)。
            _mqttClient.SubscribeAsync("topic_02", MqttQualityOfServiceLevel.AtLeastOnce);

            return Task.CompletedTask;
        }

        /// <summary>
        /// 收到消息事件
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        private Task _mqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
        {
            Console.WriteLine($"ApplicationMessageReceivedAsync:客户端ID=【{arg.ClientId}】接收到消息。 Topic主题=【{arg.ApplicationMessage.Topic}】 消息=【{Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)}】 qos等级=【{arg.ApplicationMessage.QualityOfServiceLevel}】");
            return Task.CompletedTask;
        }

        public void Publish(string data)
        {
            var message = new MqttApplicationMessage
            {
                Topic = "topic_02",
                Payload = Encoding.Default.GetBytes(data),
                QualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce,
                Retain = true  // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。
            };
            _mqttClient.PublishAsync(message);
        }
    }

38、后记:MQTT以上演示已经完毕,可以看到它的一些特性,跟websocket很接近,但是又比websocket通信更加灵活。其实,实际上MQTT的客户端在现实生产环境场景下,并不需要咱们开发者进行开发,很多硬件设备都支持提供MQTT协议的通信客户端,所以只需要自己搭建一个服务端,就可以实现实时监控各种设备推送过来的各种信号数据。同时客户端支持发布消息给其他客户端,所以就实现了设备与设备之间的一对一信号通信的效果了。如果需要下发信号给硬件设备,MQTT服务端也可以直接下发给某个指定设备来进行实现即可。上面案例只提供入门方案,如果有感兴趣的大佬,可以自己去拓展一下,来达到更好的效果。

39、以上就是该篇文章的所有内容。如果觉得有帮助,欢迎转发、点赞、推荐和评论留言。大佬们的鼓励,是我不断继续创作博客的动力之一。如果有兴趣一起探索更多.net 技术,欢迎点击下方qq群,加入一起吹牛谈人生。或者扫描下面我个人微信名片二维码加我好友,我也可以拉你到微信.net交流群。如果没有找到二维码和QQ群链接,可能是你现在进入的文章是爬虫爬走的文章,可以点击该文章原始地址[博客园]的链接来跳转回最初原文:https://www.cnblogs.com/weskynet/p/16441219.html

1995789-20220703223512513-1979455949.png

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK