9

laravel实现利用RabbitMQ实现MQTT即时通讯

 3 years ago
source link: https://aoppp.com/laravelshi-xian-li-yong-rabbitmqshi-xian-mqttji-shi-tong-xun/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

laravel实现利用RabbitMQ实现MQTT即时通讯

有时候我们的项目中会用到即时通讯功能,比如电商系统中的客服聊天功能,还有在支付过程中,当用户支付成功后,第三方支付服务会回调我们的回调接口,此时我们需要通知前端支付成功。而 RabbitMQ 可以很方便的实现即时通讯功能,如果你的业务只是少量地方使用即时通信,需要一个简易的消息系统,你可以直接考虑 MQ 的实现, MQ 有很高的吞吐率,具有持久化,还可以横向扩展,总之还不错,用就完了,奥利给!

本文需要安装好 rabbitMQlaravel ,没弄好环境的看我之前的文章 php laravel5.5使用rabbitmq消息队列

MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上。 MQTT 最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。

20201207154752

MQTT相关概念

实际上还是 MQ 的那些东西,主要看 MQ 有没有实现 MQTT 模型,懂的随便看看,不懂的先去理解 MQ

  • Publisher(发布者):消息的发出者,负责发送消息。
  • Subscriber(订阅者):消息的订阅者,负责接收并处理消息。
  • Broker(代理):消息代理,位于消息发布者和订阅者之间,各类支持MQTT协议的消息中间件都可以充当。
  • Topic(主题):可以理解为消息队列中的路由,订阅者订阅了主题之后,就可以收到发送到该主题的消息。
  • Payload(负载);可以理解为发送消息的内容。
  • QoS(消息质量):全称Quality of Service,即消息的发送质量,主要有QoS 0、QoS 1、QoS 2三个等级,下面分别介绍下:
  • QoS 0(Almost Once):至多一次,只发送一次,会发生消息丢失或重复;
  • QoS 1(Atleast Once):至少一次,确保消息到达,但消息重复可能会发生;
  • QoS 2(Exactly Once):只有一次,确保消息只到达一次。

RabbitMQ启用MQTT功能

我们是采用 docker 安装的,直接进入容器一顿操作就行

docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_mqtt

开启成功后,查看管理控制台,我们可以发现 MQTT 服务运行在 1883 端口上了。

20201207160935

MQTT客户端

我们可以使用 MQTT 客户端来测试 MQTT 的即时通讯功能,这里使用的是 MQTTBox 这个客户端工具。

首先下载并安装好 MQTTBox ,下载地址:http://workswithweb.com/mqttbox.html

20201207154931

点击 Create MQTT Client 按钮来创建一个 MQTT 客户端;

20201207154937

接下来对 MQTT 客户端进行配置,主要是配置好协议端口、连接用户名密码和QoS即可, 注意 Protocolmqtt/tcp

20201207161207

然后我们利用这个工具测试一下发布和订阅消息是否可用,一端向 TopicA 发送消息,另一端订阅 TopicA

20201207164033

可用看到效果已经出现了,那么我们如何让前端来订阅呢?

前端实现即时通讯

我们通过 html+javascript 实现一个简单的聊天功能,由于 RabbitMQWeb端 交互底层使用的是 WebSocket ,所以我们需要开启 RabbitMQMQTT WEB 支持,使用如下命令开启即可

docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_web_mqtt

开启成功后,查看管理控制台,我们可以发现 MQTTWEB 服务运行在 15675 端口上了;

WEB端MQTT 服务进行通讯需要使用一个叫 MQTT.js 的库,项目地址:https://github.com/mqttjs/MQTT.js

20201207155139

实现的功能非常简单,一个单聊功能,需要注意的是配置好 MQTT 服务的访问地址为:ws://localhost:15675/ws

<!DOCTYPE html>
<html lang="en">
<head>
	<meta charset="UTF-8">
	<title>Title</title>
</head>
<body>
<div>
	<label>目标Topic:<input id="targetTopicInput" type="text"></label><br>
	<label>发送消息:<input id="messageInput" type="text"></label><br>
	<button onclick="sendMessage()">发送</button>
	<button onclick="clearMessage()">清空</button>
	<div id="messageDiv"></div>
</div>
</body>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
    //RabbitMQ的web-mqtt连接地址
    const url = 'ws://ip:15675/ws';
    //获取订阅的topic
    const topic = getQueryString("topic");
    //连接到消息队列
    let client = mqtt.connect(url);
    client.on('connect', function () {
        //连接成功后订阅topic
        client.subscribe(topic, function (err) {
            if (!err) {
                showMessage("订阅topic:" + topic + "成功!");
            }
        });
    });
    //获取订阅topic中的消息
    client.on('message', function (topic, message) {
        showMessage("收到消息:" + message.toString());
    });

    //发送消息
    function sendMessage() {
        let targetTopic = document.getElementById("targetTopicInput").value;
        let message = document.getElementById("messageInput").value;
        //向目标topic中发送消息
        client.publish(targetTopic, message);
        showMessage("发送消息给" + targetTopic + "的消息:" + message);
    }

    //从URL中获取参数
    function getQueryString(name) {
        let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");
        let r = window.location.search.substr(1).match(reg);
        if (r != null) {
            return decodeURIComponent(r[2]);
        }
        return null;
    }

    //在消息列表中展示消息
    function showMessage(message) {
        let messageDiv = document.getElementById("messageDiv");
        let messageEle = document.createElement("div");
        messageEle.innerText = message;
        messageDiv.appendChild(messageEle);
    }

    //清空消息列表
    function clearMessage() {
        let messageDiv = document.getElementById("messageDiv");
        messageDiv.innerHTML = "";
    }
</script>
</html>

在Laravel中使用

需要保证 laravelrabbitmq 已经可以正常生产和发布消息了,保证没问题再进行以下操作

  • 安装mqtt包
composer require salmanzafar/laravel-mqtt
  • app.php
/*
 * Application Service Providers...
 */
Salman\Mqtt\MqttServiceProvider::class,
  • MqttService
<?php

namespace App\Service;

use Illuminate\Support\Facades\Auth;
use Salman\Mqtt\MqttClass\Mqtt;

class MqttService
{

    public static function SendMsgViaMqtt($topic, $message)
    {
        $mqtt = new Mqtt();
        $client_id = Auth::user()->id ?? 0;
        $output = $mqtt->ConnectAndPublish($topic, $message, $client_id);

        if ($output === true)
        {
            return "published";
        }

        return "Failed";
    }
}
  • laravel生产消息
Route::get('/pub', function () {

    $data = [
        'name' => 'zhangsan',
        'age' => 18
    ];
    $res = \App\Service\MqttService::SendMsgViaMqtt('topicA', json_encode($data));
    dump($res);
});

请求路由测试

20201207163152

20201207163210

注意:通过url的queryString进行topic订阅

消息中间件应用越来越广泛,不仅可以实现可靠的异步通信,还可以实现即时通讯,掌握一个消息中间件还是很有必要的。像普通的订单下了给后台一个推送等等,都可以选择采用 MQ 实现,方便好用!奥利给!!

本文为作者原创或转载,允许转载,由憧憬在 aoppp.com发布 转载请说明文章出处。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK