41

基于 ThinkJS 的 WebSocket 通信详解

 4 years ago
source link: https://www.tuicool.com/articles/bqeeAnv
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

基于 ThinkJS 的 WebSocket 通信详解

ErUvMvU.png!web

前言

我们的项目是基于 ThinkJS + Vue 开发的,最近实现了一个多端实时同步数据的功能,所以想写一篇文章来介绍下如何在 ThinkJS 的项目中利用 WebSocket 实现多端的实时通信。ThinkJS 是基于 Koa 2 开发的企业级 Node.js 服务端框架,文章中会从零开始实现一个简单的聊天室,希望读者们能有所收获。

WebSocket

WebSocket 是 HTML5 中提出的一种协议。它的出现是为了解决客户端和服务端的实时通信问题。在 WebSocket 出现之前,如果想实现实时消息传递一般有两种方式:

  1. 客户端通过轮询不停的向服务端发送请求,如果有新消息客户端进行更新。这种方式的缺点很明显,客户端需要不停向服务器发送请求,然而 HTTP 请求可能包含较长的头部,其中真正有效的数据可能只是很小的一部分,显然这样会浪费很多带宽资源
  2. HTTP 长连接,客户端通过 HTTP 请求连接到服务端后, 底层的 TCP 连接不会马上断开,后续的信息还是可以通过同一个连接来传输。这种方式有一个问题是每个连接会占用服务端资源,在收到消息后连接断开,就需要重新发送请求。如此循环往复。

可以看到,这两种实现方式的本质还是客户端向服务端“Pull”的过程,并没有一个服务端主动“Push”到客户端的方式,所有的方式都是依赖客户端先发起请求。为了满足两方的实时通信, WebSocket 应运而生。

WebSocket 协议

首先,WebSocket 是基于 HTTP 协议的,或者说借用了 HTTP 协议来完成连接的握手部分。其次,WebSocket 是一个持久化协议,相对于 HTTP 这种非持久的协议来说,一个 HTTP 请求在收到服务端回复后会直接断开连接,下次获取消息需要重新发送 HTTP 请求,而 WebSocket 在连接成功后可以保持连接状态。下图应该能体现两者的关系:

2qeQvya.png!web

在发起 WebSocket 请求时需要先通过 HTTP 请求告诉服务端需求将协议升级为 WebSocket。

浏览器先发送请求:

GET / HTTP/1.1
Host: localhost:8080
Origin: [url=http://127.0.0.1:3000]http://127.0.0.1:3000[/url]
Connection: Upgrade
Upgrade: WebSocket
Sec-WebSocket-Version: 13
Sec-WebSocket-Key: w4v7O6xFTi36lq3RNcgctw==

服务端回应请求:

HTTP/1.1 101 Switching Protocols
Connection:Upgrade
Upgrade: WebSocket
Sec-WebSocket-Accept: Oy4NRAQ13jhfONC7bP8dTKb4PTU=

在请求头中核心的部分是 Connection 和 Upgrade ,通过这两个字段服务端会将 HTTP 升级为 WebSocket 协议。服务端返回对应信息后连接成功,客户端和服务端就可以正常通信了。

随着新标准的推进,WebSocket 已经比较成熟了,并且各个主流浏览器对 WebSocket 的支持情况比较好(不兼容低版本 IE,IE 10 以下)

6JfeMrA.png!web

Socket.io

Socket.io 是一个完全由 JavaScript 实现、基于 Node.js、支持 WebSocket 协议的用于实时通信、跨平台的开源框架。它包括了客户端的 JavaScript 和服务器端的 Node.js,并且有着很好的兼容性,会根据浏览器的支持情况选择不同的方式进行通讯,如上面介绍的轮询和 HTTP 长连接。

简易聊天室

对于 WebSocket 目前 ThinkJS 支持了 Socket.io 并对其进行了一些简单的包装,只需要进行一些简单的配置就可

以使用 WebSocket 了。

服务端配置

stickyCluster

ThinkJS 默认采用了多进程模型,每次请求会根据策略输送到不同的进程中执行,关于其多进程模型可以参考 《细谈 ThinkJS 多进程模型》 。 而 WebSocket 连接前需要使用 HTTP 请求来完成握手升级,多个请求需要保证命中相同的进程,才能保证握手成功。这个时候就需要开启 StickyCluster 功能,使客户端所有的请求命中同一进程。修改配置文件 src/config/config.js 即可。

module.exports = {
    stickyCluster: true,
    // ...
}

添加 WebSocket 配置

src/config/extend.js 引入 WebSocket:

const websocket = require('think-websocket');
module.exports = [
  // ...
  websocket(think.app),
];

src/config/adapter.js 文件中配置 WebSocket

const socketio = require('think-websocket-socket.io');
exports.websocket = {
  type: 'socketio',
  common: {
    // common config
  },
  socketio: {
    handle: socketio,
    messages: {
      open: '/websocket/open', //建立连接时处理对应到 websocket Controller 下的 open Action
      close: '/websocket/close', // 关闭连接时处理的 Action
      room: '/websocket/room' // room 事件处理的 Action
    }
  }
}

配置中的 message 对应着事件的映射关系。比如上述的例子,客户端触发 room 事件,服务端需要在 websocket controller 下的 roomAction 中处理消息。

添加 WebSocket 实现

创建处理消息的 controller 文件。上面的配置是 /websocket/xxx ,所以直接在项目根目录 src/controller 下创建 websocket.js 文件。

module.exports = class extends think.Controller {
// this.socket 为发送消息的客户端对应的 socket 实例, this.io 为Socket.io 的一个实例
  constructor(...arg) {
    super(...arg);
    this.io = this.ctx.req.io;
    this.socket = this.ctx.req.websocket;
  }
  async openAction() {
    this.socket.emit('open', 'websocket success')
  }
  
  closeAction() {
    this.socket.disconnect(true);
  }
};

这时候服务端代码就已经配置完了。

客户端配置

客户端代码使用比较简单,只需要引入 socket.io.js 就可以直接使用了。

<script src="https://lib.baomitu.com/socket.io/2.0.1/socket.io.js"></script>

引入后在初始化代码创建 WebSocket 连接:

this.socket = io();
this.socket.on('open', data => {
    console.log('open', data)
})

这样一个最简单的 WebSocket 的 demo 就完成了,打开页面的时候会自动创建一个 WebSocket 连接,创建成功后服务端会触发 open 事件,客户端在监听的 open 事件中会接收到服务端返回的 websocket success 字符串。

接下来我们开始实现一个简单的聊天室。

简易聊天室的实现

从刚才的内容中我们知道每个 WebSocket 连接的创建会有一个 Socket 句柄创建,对应到代码中的 this.socket 变量。所以本质上聊天室人与人的通信可以转换成每个人对应的 Socket 句柄的通信。我只需要找到这个人对应的 Socket 句柄,就能实现给对方发送消息了。

简单来实现我们可以设置一个全局变量来存储连接到服务端的 WebSocket 的一些信息。在 src/bootstrap/global.js  中设置全局变量:

global.$socketChat = {};

然后在 src/bootstrap/worker.js  中引入global.js,使全局变量生效。

require('./global');

然后在服务端 controller 增加 roomActionmessageActionmessageAction 用来接收客户端用户的聊天信息,并将信息发送给所有的客户端成员。 roomAction 用来接收客户端进入/离开聊天室的信息。这两个的区别是聊天消息是需要同步到所有的成员所以使用 this.io.emit ,聊天室消息是同步到所有除当前客户端外的所有成员所以使用 this.socket.broadcast.emit

module.exports = class extends think.Controller {
    constructor(...arg) {
        super(...arg);
        this.io = this.ctx.req.io;
        this.socket = this.ctx.req.websocket;
        global.$socketChat.io = this.io;
    }

    async messageAction() {
        this.io.emit('message', {
            nickname: this.wsData.nickname,
            type: 'message',
            message: this.wsData.message,
            id: this.socket.id
        })
    }
    async roomAction() {
        global.$socketChat[this.socket.id] = {
          nickname: this.wsData.nickname,
          socket: this.socket
        }
        this.socket.broadcast.emit('room', {
            nickname: this.wsData.nickname,
            type: 'in',
            id: this.socket.id
        })
    }
    async closeAction() {
        const closeSocket = global.$socketChat[this.socket.id];
        const nickname = closeSocket && closeSocket.nickname;
        this.socket.disconnect(true);
        this.socket.removeAllListeners();
        this.socket.broadcast.emit('room', {
            nickname,
            type: 'out',
            id: this.socket.id
        })
        delete global.$socketChat[this.socket.id]
    }
}

客户端通过监听服务端 emit 的事件来处理信息

this.socket.on('message', data => {
    // 通过socket的id的对比,判断消息的发送方
    data.isMe = (data.id === this.socket.id);
    this.chatData.push(data);
})
this.socket.on('room', (data) => {
    this.chatData.push(data);
})

通过 emit 服务端对应的 action 来发送消息

this.socket.emit('room', {
    nickname: this.nickname
})
this.socket.emit('message', {
    message: this.chatMsg,
    nickname: this.nickname
})

根据发送/接收消息的type判断消息类型

<div class="chat-box">
    <div v-for="(item, index) in chatData" :key="index">
    <p v-if="item.type == 'in'" class="enter-tip">{{item.nickname}}进入聊天室</p>
    <p v-if="item.type == 'out'" class="enter-tip">{{item.nickname}}离开聊天室</p>
    <p v-else-if="item.type == 'message'" :class="['message',{'me':item.isMe}]">
        {{item.nickname}}:{{item.message}}
    </p>
    </div>
</div>

至此一个简单的聊天室就完成了。

VBzUn2f.gif

多节点通信问题

刚才我们说了通信的本质其实是 Socket 句柄查询使用的过程,本质上我们是利用全局变量存储所有的 WebSocket 句柄的方式解决了 WebSocket 连接查找的问题。但是当我们的服务端扩容后,会出现多个服务器都有 WebSocket 连接,这个时候跨节点的 WebSocket 连接查找使用全局变量的方式就无效了。此时我们就就需要换一种方式来实现跨服务器的通信同步,一般有以下几种方式:

消息队列

发送消息不直接执行 emit 事件,而是将消息发送到消息队列中,然后所有的节点对这条消息进行消费。拿到数据后查看接收方的 WebSocket 连接是否在当前节点上,不在的话就忽略这条数据,在的话则执行发送的动作。

节点通信

通过外部存储服务例如 Redis 充当之前的“全局变量”的角色,所有的节点创建 WebSocket 连接后都向 Redis 中注册一下,告诉大家有个叫 “A” 家伙的连接在  “192.168.1.1” 这。当 B 要向 A 发送消息的时候它去 Redis 中查找到 A 的连接所处的节点后,通知 192.168.1.1 这个节点 B 要向 A 发送消息,然后节点会执行发送的动作。

基于 Redis 的节点通信实现

Redis 的 pub/sub 是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。WebSocket 的一个节点接收到消息后,通过 Redis 发布(pub),其他节点作为订阅者(sub)接收消息再进行后续处理。

这次我们将在聊天室的 demo 上实现节点通信的功能。

首先,在 websocket controller 文件中增加接口调用

const ip = require('ip');
const host = ip.address();

module.exports = class extends think.Controller {
  async openAction() {
      // 记录当前 WebSocket 连接到的服务器ip
      await global.rediser.hset('-socket-chat', host, 1);
  }
  
  emit(action, data) {
      if (action === 'message') {
      this.io.emit(action, data)
    } else {
      this.socket.broadcast.emit(action, data);
    }
    this.crossSync(action, data)
  }

  
  async messageAction() {
    const data = {
      nickname: this.wsData.nickname,
      type: 'message',
      message: this.wsData.message,
      id: this.socket.id
    };
    this.emit('message', data);
  }
  
  async closeAction() {
      const connectSocketCount = Object.keys(this.io.sockets.connected).length;
      this.crossSync(action, data);
      if (connectSocketCount <= 0) {
        await global.rediser.hdel('-socket-chat', host);
      }
  }

  async crossSync(action, params) {
    const ips = await global.rediser.hkeys('-socket-chat').filter(ip => ip !== host);
    ips.forEach(ip => request({
        method: 'POST',
        uri: `http://${ip}/api/websocket/sync`,
        form: {
          action,
          data: JSON.stringify(params)
        },
        json: true
      });
    );
  }
}

然后在 src/controller/api/websocket 实现通信接口

const Base = require('../base');

module.exports = class extends Base {
  async syncAction() {
    const {action, data} = this.post();
    const blackApi = ['room', 'message', 'close', 'open'];
    if (!blackApi.includes(action)) return this.fail();
    
    // 由于是跨服务器接口,所以直接使用io.emit发送给当前所有客户端
    const io = global.$socketChat.io;
    io && io.emit(action, JSON.parse(data));
  }
};

这样就实现了跨服务的通信功能,当然这只是一个简单的 demo ,但是基本原理是相同的。

rQnqMzq.gif

socket.io-redis

第二种 Redis (sub/pub) 的方式,socket.io 提供了一种官方的库 socket.io-redis 来实现。它在 Redis 的 pub/sub 功能上进行了封装,让开发者可以忽略 Redis 相关的部分,方便了开发者使用。使用时只需要传入 Redis 的配置即可。

// Thinkjs socket.io-redis 配置
const redis = require('socket.io-redis');
exports.websocket = {
  ...
  socketio: {
    adapter: redis({ host: 'localhost', port: 6379 }),
    message: {
        ...
    }
  }
}
  
// then controller websocket.js
this.io.emit('hi', 'all sockets');

HTTP 与 WebSocket 通信

如果想通过非 socket.io 进程向 socket.io 服务通信,例如:HTTP,可以使用官方的 socket.io-emitter 库。使用方式如下:

var io = require('socket.io-emitter')({ host: '127.0.0.1', port: 6379 });
setInterval(function(){
  io.emit('time', new Date);
}, 5000);

后记

整个聊天室的代码已经上传到github,大家可以直接下载体验 聊天室示例


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK