4

跟着源码学IM(八):万字长文,手把手教你用Netty打造IM聊天

 3 years ago
source link: http://www.blogjava.net/jb2011/archive/2021/04/12/435852.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本文作者芋艿,原题“使用 Netty 实现 IM 聊天贼简单”,本底价有修订和改动。

一、本文引言

上篇《跟着源码学IM(七):手把手教你用WebSocket打造Web端IM聊天》中,我们使用 WebSocket 实现了一个简单的 IM 功能,支持身份认证、私聊消息、群聊消息。

然后就有人发私信,希望使用纯 Netty 实现一个类似的功能,因此就有了本文。

注:源码请从同步链接附件中下载,http://www.52im.net/thread-3489-1-1.html。

学习交流:

- 即时通讯/推送技术开发交流5群:215477170 [推荐]

- 移动端IM开发入门文章:《新手入门一篇就够:从零开发移动端IM

- 开源IM框架源码:https://github.com/JackJiang2011/MobileIMSDK

(本文同步发布于:http://www.52im.net/thread-3489-1-1.html

二、知识准备

可能有人不知道 Netty 是什么,这里简单介绍下:

Netty 是一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。

Netty 相当简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 Socket 服务开发。

以下是几篇有关Netty的入门文章,值得一读:

如果你连Java的NIO都不知道是什么,下面的文章建议优先读一下:

Netty源码和API的在线阅读地址:

三、本文源码

本文完整代码附件下载:请从同步链接附件中下载,http://www.52im.net/thread-3489-1-1.html。

源码的目录结构,如下图所示: 

如上图所示:

  • 1)lab-67-netty-demo-server 项目:搭建 Netty 服务端;
  • 2)lab-67-netty-demo-client 项目:搭建 Netty 客户端;
  • 3)lab-67-netty-demo-common 项目:提供 Netty 的基础封装,提供消息的编解码、分发的功能。

另外,源码中也会提供 Netty 常用功能的示例:

  • 1)心跳机制,实现服务端对客户端的存活检测;
  • 2)断线重连,实现客户端对服务端的重新连接。

不哔哔,直接开干。

五、通信协议

在上一章中,我们实现了客户端和服务端的连接功能。而本小节,我们要让它们两能够说上话,即进行数据的读写。

在日常项目的开发中,前端和后端之间采用 HTTP 作为通信协议,使用文本内容进行交互,数据格式一般是 JSON。但是在 TCP 的世界里,我们需要自己基于二进制构建,构建客户端和服务端的通信协议。

我们以客户端向服务端发送消息来举个例子,假设客户端要发送一个登录请求。

对应的类如下:

public class AuthRequest {

    /** 用户名 **/

    private String username;

    /** 密码 **/

    private String password;

显然:我们无法将一个 Java 对象直接丢到 TCP Socket 当中,而是需要将其转换成 byte 字节数组,才能写入到 TCP Socket 中去。即,需要将消息对象通过序列化,转换成 byte 字节数组。

同时:在服务端收到 byte 字节数组时,需要将其又转换成 Java 对象,即反序列化。不然,服务端对着一串 byte 字节处理个毛线?!

友情提示:服务端向客户端发消息,也是一样的过程哈!

序列化的工具非常多,例如说 Google 提供的 Protobuf,性能高效,且序列化出来的二进制数据较小。Netty 对 Protobuf 进行集成,提供了相应的编解码器。

如下图所示: 

但是考虑到很多可能对 Protobuf 并不了解,因为它实现序列化又增加额外学习成本。因此,仔细一个捉摸,还是采用 JSON 方式进行序列化。可能有人会疑惑,JSON 不是将对象转换成字符串吗?嘿嘿,我们再把字符串转换成 byte 字节数组就可以啦~

下面,我们新建 lab-67-netty-demo-common 项目,并在 codec 包下,实现我们自定义的通信协议。

如下图所示:

5.1、Invocation

创建 Invocation 类,通信协议的消息体。

代码如下:

 * 通信协议的消息体

public class Invocation {

    private String type;

     * 消息,JSON 格式

    private String message;

    // 空构造方法

    public Invocation() {

    public Invocation(String type, String message) {

        this.type = type;

        this.message = message;

    public Invocation(String type, Message message) {

        this.type = type;

        this.message = JSON.toJSONString(message);

    // ... 省略 setter、getter、toString 方法

 type 属性,类型,用于匹配对应的消息处理器。如果类比 HTTP 协议,type 属性相当于请求地址。

 message 属性,消息内容,使用 JSON 格式。

另外,Message 是我们定义的消息接口,代码如下:

public interface Message {

    // ... 空,作为标记接口

5.2、粘包与拆包

在开始看 Invocation 的编解码处理器之前,我们先了解下粘包与拆包的概念。

5.2.1 产生原因

产生粘包和拆包问题的主要原因是,操作系统在发送 TCP 数据的时候,底层会有一个缓冲区,例如 1024 个字节大小。

如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP 则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题。

例如说:在《详解 Socket 编程 --- TCP_NODELAY 选项》文章中我们可以看到,在关闭 Nagle 算法时,请求不会等待满足缓冲区大小,而是尽快发出,降低延迟。

如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP 就会将其拆分为多次发送,这就是拆包,也就是将一个大的包拆分为多个小包进行发送。

如下图展示了粘包和拆包的一个示意图,演示了粘包和拆包的三种情况: 

如上图所示:

  • 1)A 和 B 两个包都刚好满足 TCP 缓冲区的大小,或者说其等待时间已经达到 TCP 等待时长,从而还是使用两个独立的包进行发送;
  • 2)A 和 B 两次请求间隔时间内较短,并且数据包较小,因而合并为同一个包发送给服务端;
  • 3)B 包比较大,因而将其拆分为两个包 B_1 和 B_2 进行发送,而这里由于拆分后的 B_2 比较小,其又与 A 包合并在一起发送。

5.2.2 解决方案

对于粘包和拆包问题,常见的解决方案有三种。

 客户端在发送数据包的时候,每个包都固定长度。比如 1024 个字节大小,如果客户端发送的数据长度不足 1024 个字节,则通过补充空格的方式补全到指定长度。

这种方式,暂时没有找到采用这种方式的案例。

 客户端在每个包的末尾使用固定的分隔符。例如 \r\n,如果一个包被拆分了,则等待下一个包发送过来之后找到其中的 \r\n,然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包。具体的案例,有 HTTP、WebSocket、Redis。

 将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息。

友情提示:方案 ③ 是 ① 的升级版,动态长度。

本文将采用这种方式,在每次 Invocation 序列化成字节数组写入 TCP Socket 之前,先将字节数组的长度写到其中。

如下图所示: 

5.3、InvocationEncoder

创建 InvocationEncoder 类,实现将 Invocation 序列化,并写入到 TCP Socket 中。

代码如下:

public class InvocationEncoder extends MessageToByteEncoder<Invocation> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override

    protected void encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) {

        // <2.1> 将 Invocation 转换成 byte[] 数组

        byte[] content = JSON.toJSONBytes(invocation);

        // <2.2> 写入 length

        out.writeInt(content.length);

        // <2.3> 写入内容

        out.writeBytes(content);

        logger.info("[encode][连接({}) 编码了一条消息({})]", ctx.channel().id(), invocation.toString());

 MessageToByteEncoder 是 Netty 定义的编码 ChannelHandler 抽象类,将泛型 消息转换成字节数组。

 #encode(ChannelHandlerContext ctx, Invocation invocation, ByteBuf out) 方法,进行编码的逻辑。

<2.1> 处,调用 JSON 的 #toJSONBytes(Object object, SerializerFeature... features) 方法,将 Invocation 转换成 字节数组。

<2.2> 处,将字节数组的长度,写入到 TCP Socket 当中。这样,后续「5.4 InvocationDecoder」可以根据该长度,解析到消息,解决粘包和拆包的问题。

友情提示:MessageToByteEncoder 会最终将 ByteBuf out 写到 TCP Socket 中。

<2.3> 处,将字节数组,写入到 TCP Socket 当中。

5.4、InvocationDecoder

创建 InvocationDecoder 类,实现从 TCP Socket 读取字节数组,反序列化成 Invocation。

代码如下: 

 ByteToMessageDecoder 是 Netty 定义的解码 ChannelHandler 抽象类,在 TCP Socket 读取到新数据时,触发进行解码。

 在 <2.1>、<2.2>、<2.3> 处,从 TCP Socket 中读取长度。

 在 <3.1>、<3.2>、<3.3> 处,从 TCP Socket 中读取字节数组,并反序列化成 Invocation 对象。

最终,添加 List<Object> out 中,交给后续的 ChannelHandler 进行处理。稍后,我们将在「6. 消息分发」小结中,会看到 MessageDispatcher 将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。

5.5、引入依赖

创建 pom.xml 文件,引入 Netty、FastJSON 等等依赖。

5.6、本章小结

至此,我们已经完成通信协议的定义、编解码的逻辑,是不是蛮有趣的?!

另外,我们在 NettyServerHandlerInitializer 和 NettyClientHandlerInitializer 的初始化代码中,将编解码器添加到其中。

如下图所示: 

六、消息分发

在 SpringMVC 中,DispatcherServlet 会根据请求地址、方法等,将请求分发到匹配的 Controller 的 Method 方法上。

在 lab-67-netty-demo-client 项目的 dispatcher 包中,我们创建了 MessageDispatcher 类,实现和 DispatcherServlet 类似的功能,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。 

下面,我们来看看具体的代码实现。

6.1、Message

创建 Message 接口,定义消息的标记接口。

代码如下:

public interface Message {

下图,是我们涉及到的 Message 实现类。

如下图所示:

6.2、MessageHandler

创建 MessageHandler 接口,消息处理器接口。

代码如下:

public interface MessageHandler<T extendsMessage> {

     * 执行处理消息

     * @param channel 通道

     * @param message 消息

    voide xecute(Channel channel, T message);

     * @return 消息类型,即每个 Message 实现类上的 TYPE 静态字段

    String getType();

如上述代码所示:

  • 1)定义了泛型 <T> ,需要是 Message 的实现类;
  • 2)定义的两个接口方法,自己看下注释哈。

下图,是我们涉及到的 MessageHandler 实现类。

如下图所示: 

6.3、MessageHandlerContainer

创建 MessageHandlerContainer 类,作为 MessageHandler 的容器。

代码如下: 

 实现 InitializingBean 接口,在 #afterPropertiesSet() 方法中,扫描所有 MessageHandler Bean ,添加到 MessageHandler 集合中。

 在 #getMessageHandler(String type) 方法中,获得类型对应的 MessageHandler 对象。稍后,我们会在 MessageDispatcher 调用该方法。

 在 #getMessageClass(MessageHandler handler) 方法中,通过 MessageHandler 中,通过解析其类上的泛型,获得消息类型对应的 Class 类。这是参考 rocketmq-spring 项目的 DefaultRocketMQListenerContainer#getMessageType() 方法,进行略微修改。

6.4、MessageDispatcher

创建 MessageDispatcher 类,将 Invocation 分发到其对应的 MessageHandler 中,进行业务逻辑的执行。

代码如下:

@ChannelHandler.Sharable

public class MessageDispatcher extends SimpleChannelInboundHandler<Invocation> {

    @Autowired

    private MessageHandlerContainer messageHandlerContainer;

    private final ExecutorService executor =  Executors.newFixedThreadPool(200);

    @Override

    protected void channelRead0(ChannelHandlerContext ctx, Invocation invocation) {

        // <3.1> 获得 type 对应的 MessageHandler 处理器

        MessageHandler messageHandler = messageHandlerContainer.getMessageHandler(invocation.getType());

        // 获得  MessageHandler 处理器的消息类

        Class<? extendsMessage> messageClass = MessageHandlerContainer.getMessageClass(messageHandler);

        // <3.2> 解析消息

        Message message = JSON.parseObject(invocation.getMessage(), messageClass);

        // <3.3> 执行逻辑

        executor.submit(newRunnable() {

            @Override

            public void run() {

                // noinspection unchecked

                messageHandler.execute(ctx.channel(), message);

① 在类上添加 @ChannelHandler.Sharable 注解,标记这个 ChannelHandler 可以被多个 Channel 使用。

② SimpleChannelInboundHandler 是 Netty 定义的消息处理 ChannelHandler 抽象类,处理消息的类型是 <I> 泛型时。

③ #channelRead0(ChannelHandlerContext ctx, Invocation invocation) 方法,处理消息,进行分发。

<3.1> 处,调用 MessageHandlerContainer 的 #getMessageHandler(String type) 方法,获得 Invocation 的 type 对应的 MessageHandler 处理器。

然后,调用 MessageHandlerContainer 的 #getMessageClass(messageHandler) 方法,获得  MessageHandler 处理器的消息类。

<3.2> 处,调用 JSON 的 ## parseObject(String text, Class<T> clazz) 方法,将 Invocation 的 message 解析成 MessageHandler 对应的消息对象。

<3.3> 处,丢到线程池中,然后调用 MessageHandler 的 #execute(Channel channel, T message) 方法,执行业务逻辑。

注意:为什么要丢到 executor 线程池中呢?我们先来了解下 EventGroup 的线程模型。

友情提示:在我们启动 Netty 服务端或者客户端时,都会设置其 EventGroup。

EventGroup 我们可以先简单理解成一个线程池,并且线程池的大小仅仅是 CPU 数量 * 2。每个 Channel 仅仅会被分配到其中的一个线程上,进行数据的读写。并且,多个 Channel 会共享一个线程,即使用同一个线程进行数据的读写。

那么试着思考下,MessageHandler 的具体逻辑视线中,往往会涉及到 IO 处理,例如说进行数据库的读取。这样,就会导致一个 Channel 在执行 MessageHandler 的过程中,阻塞了共享当前线程的其它 Channel 的数据读取。

因此,我们在这里创建了 executor 线程池,进行 MessageHandler 的逻辑执行,避免阻塞 Channel 的数据读取。

可能会有人说,我们是不是能够把 EventGroup 的线程池设置大一点,例如说 200 呢?对于长连接的 Netty 服务端,往往会有 1000 ~ 100000 的 Netty 客户端连接上来,这样无论设置多大的线程池,都会出现阻塞数据读取的情况。

友情提示:executor 线程池,我们一般称之为业务线程池或者逻辑线程池,顾名思义,就是执行业务逻辑的。这样的设计方式,目前 Dubbo 等等 RPC 框架,都采用这种方式。后续,可以认真阅读下《【NIO 系列】——之 Reactor 模型》文章,进一步理解。

6.5、NettyServerConfig

创建 NettyServerConfig 配置类,创建 MessageDispatcher 和 MessageHandlerContainer Bean。

代码如下:

@Configuration

public class NettyServerConfig {

    @Bean

    public MessageDispatcher messageDispatcher() {

        return new MessageDispatcher();

    @Bean

    public MessageHandlerContainer messageHandlerContainer() {

        return new MessageHandlerContainer();

6.6、NettyClientConfig

创建 NettyClientConfig 配置类,创建 MessageDispatcher 和 MessageHandlerContainer Bean。

代码如下:

@Configuration

public class NettyClientConfig {

    @Bean

    public MessageDispatcher messageDispatcher() {

        return new MessageDispatcher();

    @Bean

    public MessageHandlerContainer messageHandlerContainer() {

        return new MessageHandlerContainer();

6.7、本章小结

后续,我们将在如下小节,具体演示消息分发的使用。

七、断开重连

Netty 客户端需要实现断开重连机制,解决各种情况下的断开情况。

例如说:

  • 1)Netty 客户端启动时,Netty 服务端处于挂掉,导致无法连接上;
  • 2)在运行过程中,Netty 服务端挂掉,导致连接被断开;
  • 3)任一一端网络抖动,导致连接异常断开。

具体的代码实现比较简单,只需要在两个地方增加重连机制:

  • 1)Netty 客户端启动时,无法连接 Netty 服务端时,发起重连;
  • 2)Netty 客户端运行时,和 Netty 断开连接时,发起重连。

考虑到重连会存在失败的情况,我们采用定时重连的方式,避免占用过多资源。

7.1、具体代码

① 在 NettyClient 中,提供 #reconnect() 方法,实现定时重连的逻辑。

代码如下:

// NettyClient.java

public void reconnect() {

    eventGroup.schedule(new Runnable() {

        @Override

        publicvoidrun() {

            logger.info("[reconnect][开始重连]");

                start();

            } catch(InterruptedException e) {

                logger.error("[reconnect][重连失败]", e);

    }, RECONNECT_SECONDS, TimeUnit.SECONDS);

    logger.info("[reconnect][{} 秒后将发起重连]", RECONNECT_SECONDS);

通过调用 EventLoop 提供的 #schedule(Runnable command, long delay, TimeUnit unit) 方法,实现定时逻辑。而在内部的具体逻辑,调用 NettyClient 的 #start() 方法,发起连接 Netty 服务端。

又因为 NettyClient 在 #start() 方法在连接 Netty 服务端失败时,又会调用 #reconnect() 方法,从而再次发起定时重连。如此循环反复,知道 Netty 客户端连接上 Netty 服务端。

如下图所示: 

② 在 NettyClientHandler 中,实现 #channelInactive(ChannelHandlerContext ctx) 方法,在发现和 Netty 服务端断开时,调用 Netty Client 的 #reconnect() 方法,发起重连。

代码如下:

// NettyClientHandler.java

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception {

    // 发起重连

    nettyClient.reconnect();

    // 继续触发事件

    super.channelInactive(ctx);

7.2、简单测试

① 启动 Netty Client,不要启动 Netty Server,控制台打印日志如下图: 

可以看到 Netty Client 在连接失败时,不断发起定时重连。

② 启动 Netty Server,控制台打印如下图: 

可以看到 Netty Client 成功重连上 Netty Server。

八、心跳机制与空闲检测

我们可以了解到 TCP 自带的空闲检测机制,默认是 2 小时。这样的检测机制,从系统资源层面上来说是可以接受的。

但是在业务层面,如果 2 小时才发现客户端与服务端的连接实际已经断开,会导致中间非常多的消息丢失,影响客户的使用体验。

因此,我们需要在业务层面,自己实现空闲检测,保证尽快发现客户端与服务端实际已经断开的情况。

实现逻辑如下:

  • 1)服务端发现 180 秒未从客户端读取到消息,主动断开连接;
  • 2)客户端发现 180 秒未从服务端读取到消息,主动断开连接。

考虑到客户端和服务端之间并不是一直有消息的交互,所以我们需要增加心跳机制。

逻辑如下:

  • 1)客户端每 60 秒向服务端发起一次心跳消息,保证服务端可以读取到消息;
  • 2)服务端在收到心跳消息时,回复客户端一条确认消息,保证客户端可以读取到消息。

友情提示:

为什么是 180 秒?可以加大或者减小,看自己希望多快检测到连接异常。过短的时间,会导致心跳过于频繁,占用过多资源。

为什么是 60 秒?三次机会,确认是否心跳超时。

虽然听起来有点复杂,但是实现起来并不复杂哈。

8.1、服务端的空闲检测

在 NettyServerHandlerInitializer 中,我们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。

如下图所示:

通过这样的方式,实现服务端发现 180 秒未从客户端读取到消息,主动断开连接。

8.2、客户端的空闲检测

在 NettyClientHandlerInitializer 中,我们添加了一个 ReadTimeoutHandler 处理器,它在超过指定时间未从对端读取到数据,会抛出 ReadTimeoutException 异常。

如下图所示: 

通过这样的方式,实现客户端发现 180 秒未从服务端读取到消息,主动断开连接。

8.3、心跳机制

Netty 提供了 IdleStateHandler 处理器,提供空闲检测的功能,在 Channel 的读或者写空闲时间太长时,将会触发一个 IdleStateEvent 事件。

这样,我们只需要在 NettyClientHandler 处理器中,在接收到 IdleStateEvent 事件时,客户端向客户端发送一次心跳消息。

如下图所示:

其中,HeartbeatRequest 是心跳请求。

同时,我们在服务端项目中,创建了一个 HeartbeatRequestHandler 消息处理器,在收到客户端的心跳请求时,回复客户端一条确认消息。

代码如下:

@Component

public class HeartbeatRequestHandler implementsMessageHandler<HeartbeatRequest> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override

    public void execute(Channel channel, HeartbeatRequest message) {

        logger.info("[execute][收到连接({}) 的心跳请求]", channel.id());

        // 响应心跳

        HeartbeatResponse response = newHeartbeatResponse();

        channel.writeAndFlush(newInvocation(HeartbeatResponse.TYPE, response));

    @Override

    public String getType() {

        return HeartbeatRequest.TYPE;

其中,HeartbeatResponse 是心跳确认响应。

8.4、简单测试

启动 Netty Server 服务端,再启动 Netty Client 客户端,耐心等待 60 秒后,可以看到心跳日志如下: 

九、认证逻辑

从本小节开始,我们就具体看看业务逻辑的处理示例。

认证的过程,如下图所示:

9.1、AuthRequest

创建 AuthRequest 类,定义用户认证请求。

代码如下:

public class AuthRequest implements Message {

    public static final String TYPE = "AUTH_REQUEST";

     * 认证 Token

    private String accessToken;

    // ... 省略 setter、getter、toString 方法

这里我们使用 accessToken 认证令牌进行认证。

因为一般情况下,我们使用 HTTP 进行登录系统,然后使用登录后的身份标识(例如说 accessToken 认证令牌),将客户端和当前用户进行认证绑定。

9.2、AuthResponse

创建 AuthResponse 类,定义用户认证响应。

代码如下:

public class AuthResponse implements Message {

    public static final String TYPE = "AUTH_RESPONSE";

     * 响应状态码

    private Integer code;

     * 响应提示

    private String message;

    // ... 省略 setter、getter、toString 方法

9.3、AuthRequestHandler

服务端...

创建 AuthRequestHandler 类,为服务端处理客户端的认证请求。

代码如下: 

代码比较简单,看看 <1>、<2>、<3>、<4> 上的注释。

9.4、AuthResponseHandler

客户端...

创建 AuthResponseHandler 类,为客户端处理服务端的认证响应。

代码如下:

@Component

public class AuthResponseHandler implements MessageHandler<AuthResponse> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override

    public void execute(Channel channel, AuthResponse message) {

        logger.info("[execute][认证结果:{}]", message);

    @Override

    public String getType() {

        return AuthResponse.TYPE;

打印个认证结果,方便调试。

9.5、TestController

客户端...

创建 TestController 类,提供 /test/mock 接口,模拟客户端向服务端发送请求。

代码如下:

@RestController

@RequestMapping("/test")

public class TestController {

    @Autowired

    private NettyClient nettyClient;

    @PostMapping("/mock")

    public String mock(String type, String message) {

        // 创建 Invocation 对象

        Invocation invocation = new Invocation(type, message);

        // 发送消息

        nettyClient.send(invocation);

        return "success";

9.6、简单测试

启动 Netty Server 服务端,再启动 Netty Client 客户端,然后使用 Postman 模拟一次认证请求。

如下图所示: 

同时,可以看到认证成功的日志如下:

十一、群聊逻辑

群聊的过程,如下图所示: 

服务端负责将客户端 A 发送的群聊消息,转发给客户端 A、B、C。

友情提示:考虑到逻辑简洁,提供的本小节的示例并不是一个一个群,而是所有人在一个大的群聊中哈~

11.1、ChatSendToAllRequest

创建 ChatSendToOneRequest 类,发送给所有人的群聊消息的请求。

代码如下:

public class ChatSendToAllRequest implements Message {

    public static final String TYPE = "CHAT_SEND_TO_ALL_REQUEST";

     * 消息编号

    private String msgId;

    private String content;

    // ... 省略 setter、getter、toString 方法

PS:如果是正经的群聊,会有一个 groupId 字段,表示群编号。

11.2、ChatSendToAllHandler

服务端...

创建 ChatSendToAllHandler 类,为服务端处理客户端的群聊请求。

代码如下: 

代码比较简单,看看 <1>、<2> 上的注释。

11.3、简单测试

 启动 Netty Server 服务端。

 启动 Netty Client 客户端 A。然后使用 Postman 模拟一次认证请求(用户为 yunai)。

如下图所示:

 启动 Netty Client 客户端 B。注意,需要设置 --server.port 端口为 8081,避免冲突。

 启动 Netty Client 客户端 C。注意,需要设置 --server.port 端口为 8082,避免冲突。 

 最后使用 Postman 模拟一次发送群聊消息。

如下图所示: 

同时,可以看到客户端 A 群发给所有客户端的日志如下:

最后,要想系统地学习IM开发的方方面面,请继续阅读:《新手入门一篇就够:从零开发移动端IM

附录、系列文章

跟着源码学IM(一):手把手教你用Netty实现心跳机制、断线重连机制

跟着源码学IM(二):自已开发IM很难?手把手教你撸一个Andriod版IM

跟着源码学IM(三):基于Netty,从零开发一个IM服务端

跟着源码学IM(四):拿起键盘就是干,教你徒手开发一套分布式IM系统

跟着源码学IM(五):正确理解IM长连接、心跳及重连机制,并动手实现

跟着源码学IM(六):手把手教你用Go快速搭建高性能、可扩展的IM系统

跟着源码学IM(七):手把手教你用WebSocket打造Web端IM聊天

跟着源码学IM(八):万字长文,手把手教你用Netty打造IM聊天》(* 本文)

本文已同步发布于“即时通讯技术圈”公众号。

▲ 本文在公众号上的链接是:点此进入。同步发布链接是:http://www.52im.net/thread-3489-1-1.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK