6

基于Netty,从零开发IM(二):编码实践篇(im单聊功能)

 1 year ago
source link: http://www.blogjava.net/jb2011/archive/2022/07/11/450781.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本文由作者“大白菜”分享,个人博客 cmsblogs.cn,有较多修订和改动。注意:本系列是给IM初学者的文章,IM老油条们还望海涵,勿喷!

接上篇《IM系统设计篇》,本篇主要讲解的是通过实战编码实现IM的单聊功能,内容涉及技术原理、编码实践。

1834368-20220711113059363-1229407092.png

补充说明:因为本系列文章主要目的是引导IM初学者在基于Netty的情况下,如何一步一步从零写出IM的逻辑和思维能力,因而为了简化编码实现,本系列中编码实现的客户端都是基于控制台实现的(希望不要被嫌弃),因为理解技术的本质显然比炫酷的外在表现形式更为重要。

(本文已同步发布于:http://www.52im.net/thread-3974-1-1.html

2、写在前面

建议你在阅读本文之前,务必先读本系列首篇《IM系统设计篇》,在着重理解IM系统的理论设计思路之后,再来读实战代码则效果更好。

最后,在开始本文之前,请您务必提前了解Netty的相关基础知识,可从本系列首篇《IM系统设计篇》中的“知识准备”一章开始。

1834368-20220711113108937-1154599065.jpg

3、系列文章

本文是系列文章的第2篇,以下是系列目录:

4、运行效果

本篇我们主要来实现的是IM单聊功能,具体就是:模拟IM聊天的两个用户分别登陆各自的账号,然后可以互相发送聊天消息。

我们提前看一下本篇要实现的功能运行效果。

客户端 1 登陆效果:

1834368-20220711113405935-1901916940.png

客户端 2 登陆效果:

1834368-20220711113412108-1324753916.png

客户端 1 发送消息效果图:

1834368-20220711113420431-1227604585.png

客户端 2 接受消息效果图:

1834368-20220711113426191-460523136.png

5、技术原理

5.1 概述

上节中,可以看到此次实战的运行效果是一个基于 console 控制台的聊天,根据上篇《IM系统设计篇》的思路设计,我们也知道主要核心是服务端保存一份关系映射,通过接受人 ID 找到对应的通道进行消息发送。

但是,我们要想实现具体的功能,则需要大体上做一个核心技术实现步骤的拆解,具体拆分成以下三步。

5.2 第一步: 编码和解码的实现

针对IM单聊功能,有两个核心技术点:

  • 1)一是序列化和反序列化;
  • 2)二是通讯协议实现。

客户端和服务端之间的数据通讯,我们是基于实体对象去交互的,这样数据格式更加的方便。

对于实体对象的序列化和反序列化,推荐使用 Fastjson 框架去实现,而不是Netty官方示例所使用的对象流。

同时为了更加规范的管理不同业务实体,我们需要定义一个实体基类,所有的业务实体都继承它(下面的章节会详细讲解)。

5.3 第二步: 登录和消息发送两个业务点的实现

登录主要是为了让用户 ID 和通道(就是Netty中的Channel,也即网络连接)进行绑定。

在登录成功之后为 Channel 通过 attr() 方法绑定该用户 ID,主要目的有三个:

  • 1)客户端A在发送消息时,服务端可以通过 Channel 获取消息发送者的用户ID,以便知道消息是“谁”发过来的;
  • 2)服务端在收到客户端A发过来的消息时,通过消息中的接收者用户ID,可以获取接收者的Channel,以便知道消息该发给“谁”;
  • 3)在 Channel 断开的时候,服务端可以监听到 Channel,并且获取 Channel 的属性,从而删除对应的用户ID和Chennel映射关系。

对于业务处理来说,用户登录和消息发送是两个不同的业务点,一般来说需要定义多个 Handler 来分别处理,但是这里为了减少 Handler 的数量,统一一个 Handler 处理。

* 友情提示:用户ID和Chennel的绑定,可以参考成熟的开源IM工程 MobileIMSDK  中的实现逻辑 OnlineProcessor.java,以便通过更接近IM产品级实践进行学习。

5.4 第三步: 映射关系的实现

前面也分析过了,服务端需要保存一份用户ID和Channel映射关系,这个映射关系只需要使用一个 Map 进行存储即可,即 Map<Integer,Channel>,其中:key 是用户 ID、value 是 Channel(Channel也就是客户端的网络连接对象啦)。

这部分需要交待的不多,理解清楚用户ID和Channel的关系就足够了。

接下来就是具体的编码实战了。。。

6、实体定义实战

实体的设计,主要从两个方面进行入手考虑:

  • 1)通讯协议的规则,分别是协议标识符、业务指令、数据长度、数据四个部分;
  • 2)不同业务对应不同的字段属性。

具体如下图所示:

1834368-20220711113433999-320323057.png

基础实体:

主要定义 tag 字段,标识协议的标识符,code () 抽象方法,主要表示的是业务指令,不同的业务对应不同的指令。

@Data

public abstract class BaseBean implements Serializable {

    private Integer tag=1;//固定值,标识的是一个协议类型,不同协议对应不同的值

    public abstract Byte code();//业务指令抽象方法

登录请求实体:

@Data

public class LoginReqBean extends BaseBean implements Serializable {

    private Integer userid;//用户ID

    private String username;//用户名称

    public Byte code() {

        return 1;//业务指令

登录响应实体:

@Data

public class LoginResBean extends BaseBean implements Serializable {

    private Integer status;//响应状态,0登录成功,1登录失败

    private String msg;//响应信息

    private Integer userid;//用户ID

     public Byte code() {

        return 2;//业务指令

消息发送实体:

public class MsgReqBean extends BaseBean implements Serializable {

    private Integer fromuserid;//发送人ID

    private Integer touserid;//接受人ID

    private String msg;//发送消息

     public Byte code() {

        return 3;//业务指令

消息响应响应:

public class MsgResBean extends BaseBean implements Serializable {

    private Integer status;//响应状态,0发送成功,1发送失败

    private String msg;//响应信息

     public Byte code() {

        return 4;//业务指令

消息接受实体:

public class MsgRecBean extends BaseBean implements Serializable {

    private Integer fromuserid;//发送人ID

    private String msg;//消息

    public Byte code() {

        return 5;//业务指令

7、编码和解码实战

7.1 依赖坐标

<dependency>

    <groupId>com.alibaba</groupId>

    <artifactId>fastjson</artifactId>

    <version>1.2.47</version>

</dependency>

7.2 编码实现

public class MyEncoder extends MessageToByteEncoder<BaseBean> {

    protected void encode(

        ChannelHandlerContext channelHandlerContext,

        BaseBean baseBean,

        ByteBuf byteBuf) throws Exception {

        //1.把实体序列化成字节数字

        byte[] bytes= JSON.toJSONBytes(baseBean);

        //2.根据协议组装数据

        byteBuf.writeInt(baseBean.getTag());//标识(4个字节)

        byteBuf.writeByte(baseBean.code());//指令(1个字节)

        byteBuf.writeInt(bytes.length);//长度(4个字节)

        byteBuf.writeBytes(bytes);//

7.3 解码实现

public class MyDecoder extends ByteToMessageDecoder {

    protected void decode(

        ChannelHandlerContext channelHandlerContext,

        ByteBuf byteBuf,

        List<Object> list) throws Exception {

        //1.根据协议取出数据

        int tag=byteBuf.readInt();//标识符

        byte code=byteBuf.readByte();//获取指令

        int len=byteBuf.readInt();//获取数据长度

        byte[] bytes=new byte[len];

        byteBuf.readBytes(bytes);

        //2.根据code获取类型

        Class<? extendsBaseBean> c= MapUtils.getBean(code);

        //3.反序列化

        BaseBean baseBean=JSON.parseObject(bytes,c);

        list.add(baseBean);

7.4 指令和实体关系

为什么需要这么一个工具类呢?指令表示的是业务类型,不同的业务对应不同的实体,那么解码的时候,怎么知道反序列化成什么样的实体呢?思路是获取到的指令,再根据指令找到对应的实体即可。

public class MapUtils {

    //1. 自定义指令

    private static Byte codeLoginReq=1;

    private static Byte codeLoginRes=2;

    private static Byte codeMsgReq=3;

    private static Byte codeMsgRes=4;

    private static Byte codeMsgRec=5;

    //2. 自定义一个Map,专门管理指令和实体的关系

    private static Map<Byte, Class<? extends BaseBean>> map=new HashMap<Byte,Class<? extends BaseBean>>();

    //3. 初始化

    static{

        map.put(codeLoginReq, LoginReqBean.class);

        map.put(codeLoginRes, LoginResBean.class);

        map.put(codeMsgReq, MsgReqBean.class);

        map.put(codeMsgRes, MsgResBean.class);

        map.put(codeMsgRec, MsgRecBean.class);

    //4. 根据指令获取对应的实体

    public static Class<? extends BaseBean> getBean(Byte code){

            return map.get(code);

        }catch(Exception e){

            throw new RuntimeException(e.getMessage());

8、客户端代码实战

8.1 Pipeline 管理链表

.handler(new ChannelInitializer<SocketChannel>() {

    @Override

    public void initChannel(SocketChannel ch) {

        //1.拆包器

        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,5,4));

        //2.自定义解码器

        ch.pipeline().addLast(new MyDecoder());

        //3.自定义业务

        ch.pipeline().addLast(new ClientChatHandler());

        //4.自定义编码器

        ch.pipeline().addLast(new MyEncoder());

8.2 业务 Handler

所有的业务处理在同一个 Handler 里面进行处理,通过判断实体类型来区分不同的业务处理。

public class ClientChatHandler extends ChannelInboundHandlerAdapter {

    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        //通道就绪时,发起登录请求

        login(ctx.channel());

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //根据msg做类型判断,不同的业务做不同的处理

        if(msg instanceof LoginResBean){

            //1.登录结果响应

            LoginResBean res=(LoginResBean) msg;

            System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>登录响应:"+res.getMsg());

            if(res.getStatus()==0){

                //1.登录成功,则给通道绑定属性

                ctx.channel().attr(AttributeKey.valueOf("userid")).set(res.getUserid());

                //2.调用发送消息方法

                sendMsg(ctx.channel());

            }else{

                //1.登录失败,调用登录方法

                login(ctx.channel());

        }elseif(msg instanceof MsgResBean){

            //1.发送消息结果响应

            MsgResBean res=(MsgResBean)msg;

            System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>发送响应:"+res.getMsg());

        }else if(msg instanceof MsgRecBean){

            //2.接受消息

            MsgRecBean res=(MsgRecBean)msg;

            System.out.println("fromuserid="+res.getFromuserid()+",msg="+res.getMsg());

    //登录方法

    private void login(Channel channel){

        Scanner scanner=new Scanner(System.in);

        System.out.println(">>用户ID:");

        Integer userid=scanner.nextInt();

        System.out.println(">>用户名称:");

        String username=scanner.next();

        LoginReqBean bean=new LoginReqBean();

        bean.setUserid(userid);

        bean.setUsername(username);

        channel.writeAndFlush(bean);

    //发送消息方法

    private void sendMsg(finalChannel channel){

        final Scanner scanner=new Scanner(System.in);

        new Thread(new Runnable() {

            public void run() {

                while(true){

                    System.out.println(">>接收人ID:");

                    Integer touserid=scanner.nextInt();

                    System.out.println(">>聊天内容:");

                    String msg=scanner.next();

                    MsgReqBean bean=new MsgReqBean();

                    //从通道属性获取发送人ID

                    Integer fromuserid=(Integer) channel.attr(

                        AttributeKey.valueOf("userid")

                    ).get();

                    //发送人ID

                    bean.setFromuserid(fromuserid);

                    //接受人ID

                    bean.setTouserid(touserid);

                    //发送消息

                    bean.setMsg(msg);

                    channel.writeAndFlush(bean);

        }).start();

9、服务端代码实战

9.1 Pipeline 管理链表

.childHandler(new ChannelInitializer<NioSocketChannel>() {

    protected void initChannel(NioSocketChannel ch) {

        //1.拆包器

        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,5,4));

        //2.自定义解码器

        ch.pipeline().addLast(new MyDecoder());

        //3.业务Handler

        ch.pipeline().addLast(new ServerChatHandler());

        //4.自定义编码器

        ch.pipeline().addLast(new MyEncoder());

9.2 业务 Handler

public class ServerChatHandler extends ChannelInboundHandlerAdapter{

    //1.定义一个Map(key是用户ID,value是连接通道)

    private static Map<Integer, Channel> map=new HashMap<Integer, Channel>();

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        if(msg instanceof LoginReqBean){

            //1.登录请求

            login((LoginReqBean) msg,ctx.channel());

        }else if(msg instanceof MsgReqBean){

            //2.发送消息请求

            sendMsg((MsgReqBean)msg,ctx.channel());

    //登录处理方法

    private void login(LoginReqBean bean, Channel channel){

        LoginResBean res=new LoginResBean();

        //从map里面根据用户ID获取连接通道

        Channel c=map.get(bean.getUserid());

        if(c==null){

            //通道为空,证明该用户没有在线

            //1.添加到map

            map.put(bean.getUserid(),channel);

            //2.给通道赋值

            channel.attr(AttributeKey.valueOf("userid")).set(bean.getUserid());

            //3.响应

            res.setStatus(0);

            res.setMsg("登录成功");

            res.setUserid(bean.getUserid());

            channel.writeAndFlush(res);

        }else{

            //通道不为空,证明该用户已经在线了

            res.setStatus(1);

            res.setMsg("该账户目前在线");

            channel.writeAndFlush(res);

    //消息发送处理方法

    private void sendMsg(MsgReqBean bean,Channel channel){

        Integer touserid=bean.getTouserid();

        Channel c=map.get(touserid);

        if(c==null){

            MsgResBean res=new MsgResBean();

            res.setStatus(1);

            res.setMsg(touserid+",不在线");

            channel.writeAndFlush(res);

        }else{

            MsgRecBean res=new MsgRecBean();

            res.setFromuserid(bean.getFromuserid());

            res.setMsg(bean.getMsg());

            c.writeAndFlush(res);

10、本篇小结

本篇主要编码实战了IM的单聊功能,实现思路相对还是稍微有点小复杂。

大家主要核心掌握以下几点思路就可以了:

  • 1)不同的业务可设置不同的实体和指令,需要把指令和实体的关系管理起来,方便反序列化的时候,可以根据指令来反序列化到具体实体;
  • 2)需要把用户 ID 和通道的关系管理起来(方便根据用户ID找到Channel通道,反过来也一样),并且灵活运用 Channel 的 attr () 方法,该方法可以绑定属性值,非常的有用。

11、参考资料

[1] 手把手教你用Netty实现心跳机制、断线重连机制

[2] 自已开发IM很难?手把手教你撸一个Andriod版IM

[3] 基于Netty,从零开发一个IM服务端

[4] 拿起键盘就是干,教你徒手开发一套分布式IM系统

[5] 正确理解IM长连接、心跳及重连机制,并动手实现

[6] 手把手教你用Go快速搭建高性能、可扩展的IM系统

[7] 手把手教你用WebSocket打造Web端IM聊天

[8] 万字长文,手把手教你用Netty打造IM聊天

[9] 基于Netty实现一套分布式IM系统

[10] 基于Netty,搭建高性能IM集群(含技术思路+源码)

[11] SpringBoot集成开源IM框架MobileIMSDK,实现即时通讯IM聊天功能

(本文已同步发布于:http://www.52im.net/thread-3974-1-1.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK