36

Kotlin + Netty 在 Android 上实现 Socket 的服务端 - 简书

 4 years ago
source link: https://www.jianshu.com/p/9d99d6239e6a?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
12019.08.11 20:16:07字数 716阅读 1,481

最近的一个项目:需要使用 Android App 作为 Socket 的服务端,并且一个端口能够同时监听 TCP/Web Socket 协议。

自然而然,项目决定采用 Netty 框架。Netty 服务端在收到客户端发来的消息后,能够做出相应的业务处理。在某些场景下,服务端也需要给客户端 App/网页发送消息。

二. Netty 的使用

2.1 Netty 服务端

首先,定义好 NettyServer,它使用object声明表示是一个单例。用于 Netty 服务端的启动、关闭以及发送消息。

object NettyServer {

    private val TAG = "NettyServer"

    private var channel: Channel?=null
    private lateinit var listener: NettyServerListener<String>
    private lateinit var bossGroup: EventLoopGroup
    private lateinit var workerGroup: EventLoopGroup

    var port = 8888
        set(value)  {
            field = value
        }

    var webSocketPath = "/ws"
        set(value)  {
            field = value
        }

    var isServerStart: Boolean = false
        private set

    fun start() {
        object : Thread() {
            override fun run() {
                super.run()
                bossGroup = NioEventLoopGroup(1)
                workerGroup = NioEventLoopGroup()
                try {
                    val b = ServerBootstrap()
                    b.group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel::class.java)
                            .localAddress(InetSocketAddress(port))
                            .childOption(ChannelOption.SO_KEEPALIVE, true)
                            .childOption(ChannelOption.SO_REUSEADDR, true)
                            .childOption(ChannelOption.TCP_NODELAY, true)
                            .childHandler(NettyServerInitializer(listener,webSocketPath))

                    // Bind and start to accept incoming connections.
                    val f = b.bind().sync()
                    Log.i(TAG, NettyServer::class.java.name + " started and listen on " + f.channel().localAddress())

                    isServerStart = true
                    listener.onStartServer()
                    f.channel().closeFuture().sync()
                } catch (e: Exception) {
                    Log.e(TAG, e.localizedMessage)
                    e.printStackTrace()
                } finally {
                    isServerStart = false
                    listener.onStopServer()

                    disconnect()
                }
            }
        }.start()

    }

    fun disconnect() {
        workerGroup.shutdownGracefully()
        bossGroup.shutdownGracefully()
    }

    fun setListener(listener: NettyServerListener<String>) {
        this.listener = listener
    }

    // 异步发送TCP消息
    fun sendMsgToClient(data: String, listener: ChannelFutureListener) = channel?.run {

        val flag = this.isActive

        if (flag) {

            this.writeAndFlush(data + System.getProperty("line.separator")).addListener(listener)
        }

        flag
    } ?: false

    // 同步发送TCP消息
    fun sendMsgToClient(data: String) = channel?.run {

        if (this.isActive) {

            return this.writeAndFlush(data + System.getProperty("line.separator")).awaitUninterruptibly().isSuccess
        }

        false
    } ?: false

    // 异步发送WebSocket消息
    fun sendMsgToWS(data: String,listener: ChannelFutureListener) = channel?.run {

        val flag = this.isActive

        if (flag) {

            this.writeAndFlush(TextWebSocketFrame(data)).addListener(listener)
        }

        flag
    } ?: false

    // 同步发送TCP消息
    fun sendMsgToWS(data: String) = channel?.run {

        if (this.isActive) {

            return this.writeAndFlush(TextWebSocketFrame(data)).awaitUninterruptibly().isSuccess
        }

        false
    } ?: false

    /**
     * 切换通道
     * 设置服务端,与哪个客户端通信
     * @param channel
     */
    fun selectorChannel(channel: Channel?) {
        this.channel = channel
    }
}

NettyServerInitializer 是服务端跟客户端连接之后使用的 childHandler:

class NettyServerInitializer(private val mListener: NettyServerListener<String>,private val webSocketPath:String) : ChannelInitializer<SocketChannel>() {

    @Throws(Exception::class)
    public override fun initChannel(ch: SocketChannel) {

        val pipeline = ch.pipeline()

        pipeline.addLast("active",ChannelActiveHandler(mListener))
        pipeline.addLast("socketChoose", SocketChooseHandler(webSocketPath))

        pipeline.addLast("string_encoder",StringEncoder(CharsetUtil.UTF_8))
        pipeline.addLast("linebased",LineBasedFrameDecoder(1024))
        pipeline.addLast("string_decoder",StringDecoder(CharsetUtil.UTF_8))
        pipeline.addLast("commonhandler", CustomerServerHandler(mListener))
    }
}

NettyServerInitializer 包含了多个 Handler:连接使用的ChannelActiveHandler,协议选择使用的 SocketChooseHandler,TCP 消息使用的 StringEncoder、LineBasedFrameDecoder、StringDecoder,以及最终处理消息的 CustomerServerHandler。

ChannelActiveHandler:

@ChannelHandler.Sharable
class ChannelActiveHandler(var mListener: NettyServerListener<String>) : ChannelInboundHandlerAdapter() {

    @Throws(Exception::class)
    override fun channelActive(ctx: ChannelHandlerContext) {

        val insocket = ctx.channel().remoteAddress() as InetSocketAddress
        val clientIP = insocket.address.hostAddress
        val clientPort = insocket.port

        Log.i("ChannelActiveHandler","新的连接:$clientIP : $clientPort")
        mListener.onChannelConnect(ctx.channel())
    }

}

SocketChooseHandler 通过读取消息来区分是 WebSocket 还是 Socket。如果是 WebSocket 的话,去掉 Socket 使用的相关 Handler。

class SocketChooseHandler(val webSocketPath:String) : ByteToMessageDecoder() {

    @Throws(Exception::class)
    override fun decode(ctx: ChannelHandlerContext, `in`: ByteBuf, out: List<Any>) {
        val protocol = getBufStart(`in`)
        if (protocol.startsWith(WEBSOCKET_PREFIX)) {
            PipelineAdd.websocketAdd(ctx,webSocketPath)

            ctx.pipeline().remove("string_encoder")
            ctx.pipeline().remove("linebased")
            ctx.pipeline().remove("string_decoder")
        }
        `in`.resetReaderIndex()
        ctx.pipeline().remove(this.javaClass)
    }

    private fun getBufStart(`in`: ByteBuf): String {
        var length = `in`.readableBytes()
        if (length > MAX_LENGTH) {
            length = MAX_LENGTH
        }

        // 标记读位置
        `in`.markReaderIndex()
        val content = ByteArray(length)
        `in`.readBytes(content)
        return String(content)
    }

    companion object {
        /** 默认暗号长度为23  */
        private val MAX_LENGTH = 23
        /** WebSocket握手的协议前缀  */
        private val WEBSOCKET_PREFIX = "GET /"
    }
}

StringEncoder、LineBasedFrameDecoder、StringDecoder 都是 Netty 内置的编、解码器。其中,LineBasedFrameDecoder 用于解决 TCP粘包/拆包的问题。

CustomerServerHandler:

@ChannelHandler.Sharable
class CustomerServerHandler(private val mListener: NettyServerListener<String>) : SimpleChannelInboundHandler<Any>() {

    @Throws(Exception::class)
    override fun channelReadComplete(ctx: ChannelHandlerContext) {
    }

    override fun exceptionCaught(ctx: ChannelHandlerContext,
                                 cause: Throwable) {
        cause.printStackTrace()
        ctx.close()
    }

    @Throws(Exception::class)
    override fun channelRead0(ctx: ChannelHandlerContext, msg: Any) {

        val buff = msg as ByteBuf
        val info = buff.toString(CharsetUtil.UTF_8)
        Log.d(TAG,"收到消息内容:$info")
    }

    @Throws(Exception::class)
    override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {

        if (msg is WebSocketFrame) {  // 处理 WebSocket 消息

            val webSocketInfo = (msg as TextWebSocketFrame).text().trim { it <= ' ' }

            Log.d(TAG, "收到WebSocketSocket消息:$webSocketInfo")

            mListener.onMessageResponseServer(webSocketInfo , ctx.channel().id().asShortText())
        } else if (msg is String){   // 处理 Socket 消息

            Log.d(TAG, "收到socket消息:$msg")

            mListener.onMessageResponseServer(msg, ctx.channel().id().asShortText())
        }
    }

    // 断开连接
    @Throws(Exception::class)
    override fun channelInactive(ctx: ChannelHandlerContext) {
        super.channelInactive(ctx)
        Log.d(TAG, "channelInactive")

        val reAddr = ctx.channel().remoteAddress() as InetSocketAddress
        val clientIP = reAddr.address.hostAddress
        val clientPort = reAddr.port

        Log.d(TAG,"连接断开:$clientIP : $clientPort")

        mListener.onChannelDisConnect(ctx.channel())
    }

    companion object {

        private val TAG = "CustomerServerHandler"
    }
}

2.2 Netty 客户端

客户端也需要一个启动、关闭、发送消息的 NettyTcpClient,并且 NettyTcpClient 的创建采用 Builder 模式。

class NettyTcpClient private constructor(val host: String, val tcp_port: Int, val index: Int) {

    private lateinit var group: EventLoopGroup

    private lateinit var listener: NettyClientListener<String>

    private var channel: Channel? = null

    /**
     * 获取TCP连接状态
     *
     * @return  获取TCP连接状态
     */
    var connectStatus = false

    /**
     * 最大重连次数
     */
    var maxConnectTimes = Integer.MAX_VALUE
        private set

    private var reconnectNum = maxConnectTimes

    private var isNeedReconnect = true

    var isConnecting = false
        private set

    var reconnectIntervalTime: Long = 5000
        private set

    /**
     * 心跳间隔时间
     */
    var heartBeatInterval: Long = 5
        private set//单位秒

    /**
     * 是否发送心跳
     */
    var isSendheartBeat = false
        private set

    /**
     * 心跳数据,可以是String类型,也可以是byte[].
     */
    private var heartBeatData: Any? = null

    fun connect() {
        if (isConnecting) {
            return
        }

        val clientThread = object : Thread("Netty-Client") {
            override fun run() {
                super.run()
                isNeedReconnect = true
                reconnectNum = maxConnectTimes
                connectServer()
            }
        }
        clientThread.start()
    }


    private fun connectServer() {

        synchronized(this@NettyTcpClient) {

            var channelFuture: ChannelFuture?=null

            if (!connectStatus) {
                isConnecting = true
                group = NioEventLoopGroup()
                val bootstrap = Bootstrap().group(group)
                        .option(ChannelOption.TCP_NODELAY, true)//屏蔽Nagle算法试图
                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                        .channel(NioSocketChannel::class.java as Class<out Channel>?)
                        .handler(object : ChannelInitializer<SocketChannel>() {

                            @Throws(Exception::class)
                            public override fun initChannel(ch: SocketChannel) {

                                if (isSendheartBeat) {
                                    ch.pipeline().addLast("ping", IdleStateHandler(0, heartBeatInterval, 0, TimeUnit.SECONDS)) //5s未发送数据,回调userEventTriggered
                                }

                                ch.pipeline().addLast(StringEncoder(CharsetUtil.UTF_8))
                                ch.pipeline().addLast(StringDecoder(CharsetUtil.UTF_8))
                                ch.pipeline().addLast(LineBasedFrameDecoder(1024))//黏包处理,需要客户端、服务端配合
                                ch.pipeline().addLast(NettyClientHandler(listener, index, isSendheartBeat, heartBeatData))
                            }
                        })

                try {
                    channelFuture = bootstrap.connect(host, tcp_port).addListener {
                        if (it.isSuccess) {
                            Log.d(TAG, "连接成功")
                            reconnectNum = maxConnectTimes
                            connectStatus = true
                            channel = channelFuture?.channel()
                        } else {
                            Log.d(TAG, "连接失败")
                            connectStatus = false
                        }
                        isConnecting = false
                    }.sync()

                    // Wait until the connection is closed.
                    channelFuture.channel().closeFuture().sync()
                    Log.d(TAG, " 断开连接")
                } catch (e: Exception) {
                    e.printStackTrace()
                } finally {
                    connectStatus = false
                    listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_CLOSED, index)

                    if (channelFuture != null) {
                        if (channelFuture.channel() != null && channelFuture.channel().isOpen) {
                            channelFuture.channel().close()
                        }
                    }
                    group.shutdownGracefully()
                    reconnect()
                }
            }
        }
    }


    fun disconnect() {
        Log.d(TAG, "disconnect")
        isNeedReconnect = false
        group.shutdownGracefully()
    }

    fun reconnect() {
        Log.d(TAG, "reconnect")
        if (isNeedReconnect && reconnectNum > 0 && !connectStatus) {
            reconnectNum--
            SystemClock.sleep(reconnectIntervalTime)
            if (isNeedReconnect && reconnectNum > 0 && !connectStatus) {
                Log.e(TAG, "重新连接")
                connectServer()
            }
        }
    }

    /**
     * 异步发送
     *
     * @param data 要发送的数据
     * @param listener 发送结果回调
     * @return 方法执行结果
     */
    fun sendMsgToServer(data: String, listener: MessageStateListener) = channel?.run {

        val flag = this != null && connectStatus

        if (flag) {

            this.writeAndFlush(data + System.getProperty("line.separator")).addListener { channelFuture -> listener.isSendSuccss(channelFuture.isSuccess) }
        }

        flag

    } ?: false

    /**
     * 同步发送
     *
     * @param data 要发送的数据
     * @return 方法执行结果
     */
    fun sendMsgToServer(data: String) = channel?.run {

        val flag = this != null && connectStatus

        if (flag) {

            val channelFuture = this.writeAndFlush(data + System.getProperty("line.separator")).awaitUninterruptibly()
            return channelFuture.isSuccess
        }

        false

    }?:false

    fun setListener(listener: NettyClientListener<String>) {
        this.listener = listener
    }

    /**
     * Builder 模式创建NettyTcpClient
     */
    class Builder {

        /**
         * 最大重连次数
         */
        private var MAX_CONNECT_TIMES = Integer.MAX_VALUE

        /**
         * 重连间隔
         */
        private var reconnectIntervalTime: Long = 5000
        /**
         * 服务器地址
         */
        private var host: String? = null
        /**
         * 服务器端口
         */
        private var tcp_port: Int = 0
        /**
         * 客户端标识,(因为可能存在多个连接)
         */
        private var mIndex: Int = 0

        /**
         * 是否发送心跳
         */
        private var isSendheartBeat: Boolean = false
        /**
         * 心跳时间间隔
         */
        private var heartBeatInterval: Long = 5

        /**
         * 心跳数据,可以是String类型,也可以是byte[].
         */
        private var heartBeatData: Any? = null


        fun setMaxReconnectTimes(reConnectTimes: Int): Builder {
            this.MAX_CONNECT_TIMES = reConnectTimes
            return this
        }


        fun setReconnectIntervalTime(reconnectIntervalTime: Long): Builder {
            this.reconnectIntervalTime = reconnectIntervalTime
            return this
        }


        fun setHost(host: String): Builder {
            this.host = host
            return this
        }

        fun setTcpPort(tcp_port: Int): Builder {
            this.tcp_port = tcp_port
            return this
        }

        fun setIndex(mIndex: Int): Builder {
            this.mIndex = mIndex
            return this
        }

        fun setHeartBeatInterval(intervalTime: Long): Builder {
            this.heartBeatInterval = intervalTime
            return this
        }

        fun setSendheartBeat(isSendheartBeat: Boolean): Builder {
            this.isSendheartBeat = isSendheartBeat
            return this
        }

        fun setHeartBeatData(heartBeatData: Any): Builder {
            this.heartBeatData = heartBeatData
            return this
        }

        fun build(): NettyTcpClient {
            val nettyTcpClient = NettyTcpClient(host!!, tcp_port, mIndex)
            nettyTcpClient.maxConnectTimes = this.MAX_CONNECT_TIMES
            nettyTcpClient.reconnectIntervalTime = this.reconnectIntervalTime
            nettyTcpClient.heartBeatInterval = this.heartBeatInterval
            nettyTcpClient.isSendheartBeat = this.isSendheartBeat
            nettyTcpClient.heartBeatData = this.heartBeatData
            return nettyTcpClient
        }
    }

    companion object {
        private val TAG = "NettyTcpClient"
        private val CONNECT_TIMEOUT_MILLIS = 5000
    }
}

Android 的客户端相对而言比较简单,需要的 Handler 包括:支持心跳的 IdleStateHandler, TCP 消息需要使用的 Handler (跟服务端一样分别是StringEncoder、StringDecoder、LineBasedFrameDecoder),以及对收到 TCP 消息进行处理的 NettyClientHandler。

NettyClientHandler:

class NettyClientHandler(private val listener: NettyClientListener<String>, private val index: Int, private val isSendheartBeat: Boolean, private val heartBeatData: Any?) : SimpleChannelInboundHandler<String>() {

    /**
     *
     * 设定IdleStateHandler心跳检测每x秒进行一次读检测,
     * 如果x秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法
     *
     * @param ctx ChannelHandlerContext
     * @param evt IdleStateEvent
     */
    override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {

        if (evt is IdleStateEvent) {
            if (evt.state() == IdleState.WRITER_IDLE) {   //发送心跳

                if (isSendheartBeat) {
                    if (heartBeatData == null) {

                        ctx.channel().writeAndFlush("Heartbeat" + System.getProperty("line.separator")!!)
                    } else {

                        if (heartBeatData is String) {
                            Log.d(TAG, "userEventTriggered: String")
                            ctx.channel().writeAndFlush(heartBeatData + System.getProperty("line.separator")!!)
                        } else if (heartBeatData is ByteArray) {
                            Log.d(TAG, "userEventTriggered: byte")
                            val buf = Unpooled.copiedBuffer((heartBeatData as ByteArray?)!!)
                            ctx.channel().writeAndFlush(buf)
                        } else {

                            Log.d(TAG, "userEventTriggered: heartBeatData type error")
                        }
                    }
                } else {
                    Log.d(TAG, "不发送心跳")
                }
            }
        }
    }

    /**
     *
     * 客户端上线
     *
     * @param ctx ChannelHandlerContext
     */
    override fun channelActive(ctx: ChannelHandlerContext) {

        Log.d(TAG, "channelActive")
        listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_SUCCESS, index)
    }

    /**
     *
     * 客户端下线
     *
     * @param ctx ChannelHandlerContext
     */
    override fun channelInactive(ctx: ChannelHandlerContext) {

        Log.d(TAG, "channelInactive")
    }

    /**
     * 客户端收到消息
     *
     * @param channelHandlerContext ChannelHandlerContext
     * @param msg                   消息
     */
    override fun channelRead0(channelHandlerContext: ChannelHandlerContext, msg: String) {

        Log.d(TAG, "channelRead0:")
        listener.onMessageResponseClient(msg, index)
    }

    /**
     * @param ctx   ChannelHandlerContext
     * @param cause 异常
     */
    override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {

        Log.e(TAG, "exceptionCaught")
        listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_ERROR, index)
        cause.printStackTrace()
        ctx.close()
    }

    companion object {

        private val TAG = "NettyClientHandler"
    }
}

三. Demo 的实现

3.1 Socket 服务端

启动 NettyServer:

    private fun startServer() {

        if (!NettyServer.isServerStart) {
            NettyServer.setListener(this@MainActivity)
            NettyServer.port = port
            NettyServer.webSocketPath = webSocketPath
            NettyServer.start()
        } else {
            NettyServer.disconnect()
        }
    }

NettyServer 异步发送 TCP 消息:

NettyServer.sendMsgToClient(msg, ChannelFutureListener { channelFuture ->

        if (channelFuture.isSuccess) {

            msgSend(msg)
      } 
})

NettyServer 异步发送 WebSocket 消息:

NettyServer.sendMsgToWS(msg, ChannelFutureListener { channelFuture ->

        if (channelFuture.isSuccess) {

              msgSend(msg)
      } 
 })

Demo 可以通过 startServer 来启动 Socket 服务端,也可以在启动之前点击 configServer 来修改服务端的端口以及 WebSocket 的 Endpoint。

NettyServer1.png

3.2 Socket 客户端

NettyTcpClient 通过 Builder 模式创建:

            mNettyTcpClient = NettyTcpClient.Builder()
                    .setHost(ip)                    //设置服务端地址
                    .setTcpPort(port)               //设置服务端端口号
                    .setMaxReconnectTimes(5)        //设置最大重连次数
                    .setReconnectIntervalTime(5)    //设置重连间隔时间。单位:秒
                    .setSendheartBeat(false)        //设置发送心跳
                    .setHeartBeatInterval(5)        //设置心跳间隔时间。单位:秒
                    .setHeartBeatData("I'm is HeartBeatData") //设置心跳数据,可以是String类型,也可以是byte[],以后设置的为准
                    .setIndex(0)                    //设置客户端标识.(因为可能存在多个tcp连接)
                    .build()

            mNettyTcpClient.setListener(this@MainActivity) //设置TCP监听

启动、关闭客户端连接:

    private fun connect() {
        Log.d(TAG, "connect")
        if (!mNettyTcpClient.connectStatus) {
            mNettyTcpClient.connect()//连接服务器
        } else {
            mNettyTcpClient.disconnect()
        }
    }

NettyTcpClient 异步发送 TCP 消息到服务端:

mNettyTcpClient.sendMsgToServer(msg, object : MessageStateListener {
         override fun isSendSuccss(isSuccess: Boolean) {
                if (isSuccess) {
                           
                   msgSend(msg)
               } 
         }
})

Demo 的客户端 App 也可以在启动之前点击 configClient 来修改要连接的服务端 IP 、端口。

NettyTcpClientpng.png
NettyServer2.png

WebSocket 的测试可以通过:http://www.websocket-test.com/

Netty Server 端跟网页通信:

NettyServer3.png

WebSocket在线测试:

websocket_test.png

借助 Kotlin 的特性以及 Netty 框架,我们在 Android 上也实现了一个 Socket 服务端。

本文 demo github 地址:https://github.com/fengzhizi715/Netty4Android

本文的例子很简单,只是发送简单的消息。在实际生产环境中,我们采用的消息格式可能是 json ,因为 json 更加灵活,通过解析 json 获取消息的内容。

参考资料:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK