9

CrazyIM学习笔记三【服务端登录流程】

 3 years ago
source link: http://bboyjing.github.io/2021/03/20/CrazyIM%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0%E4%B8%89%E3%80%90%E6%9C%8D%E5%8A%A1%E7%AB%AF%E7%99%BB%E5%BD%95%E6%B5%81%E7%A8%8B%E3%80%91/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

  登录的流程,从端到端的角度来说,包括如下环节:

  1. 客户端发送登录数据包。
  2. 服务端进行用户信息验证。
  3. 服务器端创建Session会话。
  4. 服务器端返回登录结果的信息给客户端,包括成功标志、Session ID等。

本章节将实现服务端登录的相关逻辑,其中涉及的主要模块如下:

  • Handler模块:客户端请求的处理
  • Processor模块:以异步的方式完成请求的业务逻辑处理
  • Session模块:管理用户与通道的绑定关系

首先创建chatserver模块,服务端代码大多在该模块中实现。

ProtoBufBuilder模块

  该模块用于存放消息的Bean。

  User类是服务端和客户端共用的类,放到chatcommon模块中,先把成员变量写上,后续使用到具体方法再添加:

@Builder
@Data
@Slf4j
public class User implements Serializable {
String uid;
String devId;
String token;
String nickName;
transient PlatForm platform;
int intPlatFrom;
private String sessionId;
public static User fromMsg(ProtoMsg.LoginRequest info) {
User user = User.builder()
.uid(info.getUid())
.devId(info.getDeviceId())
.token(info.getToken())
.intPlatFrom(info.getPlatform()).build();
log.info("登录中: {}", user.toString());
return user;
@AllArgsConstructor
public enum PlatForm {
WINDOWS(1, "windows"),
MAC(2, "mac"),
ANDROID(3, "android"),
IOS(4, "ios"),
WEB(5, "web"),
UNKNOWN(6, "unknown");
@Getter
private int code;
@Getter
private String msg;
public static String getMsg(Byte code) {
for (PlatForm item : values()) {
if (item.getCode() == code) {
return item.getMsg();
return "";

LoginResponseBuilder

@Service
public class LoginResponseBuilder {
public ProtoMsg.Message loginResponse(ProtoInstant.ResultCodeEnum en, long seqId, String sessionId) {
ProtoMsg.Message.Builder mb = ProtoMsg.Message.newBuilder()
.setType(ProtoMsg.HeadType.LOGIN_RESPONSE) // 设置消息类型
.setSequence(seqId)
.setSessionId(sessionId); // 设置应答流水,与请求对应
ProtoMsg.LoginResponse.Builder b = ProtoMsg.LoginResponse.newBuilder()
.setCode(en.getCode())
.setInfo(en.getDesc())
.setExpose(1);
mb.setLoginResponse(b.build());
return mb.build();

ChatMsgBuilder

@Service
public class ChatMsgBuilder {
public ProtoMsg.Message chatResponse(long seqId, ProtoInstant.ResultCodeEnum en) {
ProtoMsg.Message.Builder mb = ProtoMsg.Message.newBuilder()
.setType(ProtoMsg.HeadType.MESSAGE_RESPONSE) //设置消息类型
.setSequence(seqId); //设置应答流水,与请求对应
ProtoMsg.MessageResponse.Builder rb =
ProtoMsg.MessageResponse.newBuilder()
.setCode(en.getCode())
.setInfo(en.getDesc())
.setExpose(1);
mb.setMessageResponse(rb.build());
return mb.build();

Session模块

  Session模块是基础,先构建出来,其中有两个重要的类。

SessionMap

  用来保存所有的ServerSession:

@Slf4j
public class SessionMap {
private SessionMap() {
private static SessionMap singleInstance = new SessionMap();
public static SessionMap inst() {
return singleInstance;
// 会话集合
private ConcurrentHashMap<String, ServerSession> map = new ConcurrentHashMap<>();
* 增加session对象
public void addSession(String sessionId, ServerSession s) {
map.put(sessionId, s);
log.info("用户登录:id= " + s.getUser().getUid() + " 在线总数: " + map.size());
* 获取session对象
public ServerSession getSession(String sessionId) {
return map.getOrDefault(sessionId, null);
* 根据用户id,获取session对象
public List<ServerSession> getSessionsBy(String userId) {
List<ServerSession> list = map.values()
.stream()
.filter(s -> s.getUser().getUid().equals(userId))
.collect(Collectors.toList());
return list;
* 删除session
public void removeSession(String sessionId) {
if (!map.containsKey(sessionId)) {
return;
ServerSession s = map.get(sessionId);
map.remove(sessionId);
log.info("用户下线:id= " + s.getUser().getUid() + " 在线总数: " + map.size());
* 判断用户是否登录
* @param user
* @return
public boolean hasLogin(User user) {
Iterator<Map.Entry<String, ServerSession>> it = map.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, ServerSession> next = it.next();
User u = next.getValue().getUser();
if (u.getUid().equals(user.getUid()) && u.getPlatform().equals(user.getPlatform())) {
return true;
return false;

ServerSession

  为每个登录的用户维护一个ServerSession:

@Slf4j
public class ServerSession {
public static final AttributeKey<String> KEY_USER_ID = AttributeKey.valueOf("key_user_id");
public static final AttributeKey<ServerSession> SESSION_KEY = AttributeKey.valueOf("SESSION_KEY");
* 用户实现服务端会话管理的核心
private Channel channel;
@Getter
private User user;
* session唯一标示
@Getter
private final String sessionId;
* 登录状态
@Getter
private boolean login = false;
* session中存储的session 变量属性值
private Map<String, Object> map = new HashMap<String, Object>();
public ServerSession(Channel channel) {
this.channel = channel;
this.sessionId = this.buildNewSessionId();
* 从通道中获取session
* @param ctx
* @return
public static ServerSession getSession(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
return channel.attr(ServerSession.SESSION_KEY).get();
private String buildNewSessionId() {
String uuid = UUID.randomUUID().toString();
return uuid.replaceAll("-", "");
// 写ProtoBuf数据帧
public synchronized void writeAndFlush(Object pkg) {
channel.writeAndFlush(pkg);
public void setUser(User user) {
this.user = user;
user.setSessionId(sessionId);
* 和channel 通道实现双向绑定
* @return
public ServerSession bind() {
log.info(" ServerSession 绑定会话 " + channel.remoteAddress());
channel.attr(ServerSession.SESSION_KEY).set(this);
SessionMap.inst().addSession(getSessionId(), this);
login = true;
return this;
// 关闭连接
public static void closeSession(ChannelHandlerContext ctx) {
ServerSession session = ctx.channel().attr(ServerSession.SESSION_KEY).get();
if (null != session && session.isValid()) {
session.close();
SessionMap.inst().removeSession(session.getSessionId());
//关闭连接
public synchronized void close() {
ChannelFuture future = channel.close();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("CHANNEL_CLOSED error ");
public boolean isValid() {
return getUser() != null ? true : false;

Processor模块

  processor模块有一个基础接口和一个抽象类:

public interface ServerProcessor {
* 获取消息类型
* @return
ProtoMsg.HeadType type();
* 定义Processor的行为
* @param session
* @param proto
* @return
boolean action(ServerSession session, ProtoMsg.Message proto);
public abstract class AbstractServerProcessor implements ServerProcessor {
* 获取通道的KEY_USER_ID属性值
* @param ch
* @return
protected String getKey(Channel ch) {
return ch.attr(ServerSession.KEY_USER_ID).get();
* 给通道的KEY_USER_ID属性设值
* @param ch
* @param key
protected void setKey(Channel ch, String key) {
ch.attr(ServerSession.KEY_USER_ID).set(key);
* 判断是否
* @param ch
* @throws Exception
protected void checkAuth(Channel ch) throws Exception {
if (null == getKey(ch)) {
throw new Exception("此用户,没有登录成功");

LoginProcessor

  登录处理逻辑:

@Slf4j
@Service
public class LoginProcessor extends AbstractServerProcessor {
@Autowired
private LoginResponseBuilder loginResponseBuilder;
@Override
public ProtoMsg.HeadType type() {
return ProtoMsg.HeadType.LOGIN_REQUEST;
@Override
public boolean action(ServerSession session, ProtoMsg.Message proto) {
// 取出token验证
ProtoMsg.LoginRequest info = proto.getLoginRequest();
long seqNo = proto.getSequence();
User user = User.fromMsg(info);
if (!checkUser(user)) {
// 已经处于登录状态
ProtoInstant.ResultCodeEnum resultcode = ProtoInstant.ResultCodeEnum.NO_TOKEN;
// 构造登录失败的报文
ProtoMsg.Message response = loginResponseBuilder.loginResponse(resultcode, seqNo, "-1");
// 发送登录失败的报文
session.writeAndFlush(response);
return false;
session.setUser(user);
session.bind();
// 登录成功
ProtoInstant.ResultCodeEnum resultcode = ProtoInstant.ResultCodeEnum.SUCCESS;
//构造登录成功的报文
ProtoMsg.Message response = loginResponseBuilder.loginResponse(resultcode, seqNo, session.getSessionId());
//发送登录成功的报文
session.writeAndFlush(response);
return true;
private boolean checkUser(User user) {
if (SessionMap.inst().hasLogin(user)) {
return false;
* 校验用户,比较耗时的操作,需要100 ms以上的时间
* 方法1:调用远程用户restfull 校验服务
* 方法2:调用数据库接口校验
return true;

Concurrent模块

  因为Handler中的业务Processor都是异步处理的,先在comon模块中添加一些多线程相关的工具类。首先在common模块中添加sl4j依赖:

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>

CallbackTask

  支持回调的任务接口:

public interface CallbackTask<R> {
R execute() throws Exception;
void onSuccess(R r);
void onFailure(Throwable t);

CallbackTaskScheduler

  支持回调的异步任务,首先引入guava依赖:

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>17.0</version>
</dependency>
@Slf4j
public class CallbackTaskScheduler extends Thread {
* 任务队列
private ConcurrentLinkedQueue<CallbackTask> executeTaskQueue = new ConcurrentLinkedQueue<CallbackTask>();
* 线程休眠时间
private long sleepTime = 200;
* 固定10个的线程池
* 用来从队列中获取需要执行的CallbackTask
private ExecutorService jPool = Executors.newFixedThreadPool(10);
* 真正执行任务的线程池
ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool);
private static CallbackTaskScheduler inst = new CallbackTaskScheduler();
* 私有构造函数,直接启动线程
private CallbackTaskScheduler() {
this.start();
@Override
public void run() {
while (true) {
// 处理任务
handleTask();
// 当队列中任务处理完,休眠指定时间
threadSleep(sleepTime);
* 添加任务
* @param executeTask
public static <R> void add(CallbackTask<R> executeTask) {
inst.executeTaskQueue.add(executeTask);
* 线程休眠
* @param time
private void threadSleep(long time) {
sleep(time);
} catch (InterruptedException e) {
log.error(e.getLocalizedMessage());
* 处理任务队列,检查其中是否有任务
* 如果有任务交给ListeningExecutorService执行
private void handleTask() {
CallbackTask executeTask = null;
while (executeTaskQueue.peek() != null) {
executeTask = executeTaskQueue.poll();
handleTask(executeTask);
} catch (Exception e) {
log.error(e.getLocalizedMessage());
* 执行任务操作
* @param executeTask
private <R> void handleTask(CallbackTask<R> executeTask) {
// 提交任务,返回Future
ListenableFuture<R> future = gPool.submit(() -> executeTask.execute());
// Future添加回调,当有结果是调用FutureCallback实现的函数
Futures.addCallback(future, new FutureCallback<R>() {
@Override
public void onSuccess(R r) {
executeTask.onSuccess(r);
@Override
public void onFailure(Throwable t) {
executeTask.onFailure(t);

Handler模块

 该模块处理Netty入站消息。

LoginRequestHandler

@Slf4j
@Service
@ChannelHandler.Sharable
public class LoginRequestHandler extends ChannelInboundHandlerAdapter {
@Autowired
private LoginProcessor loginProcessor;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (null == msg || !(msg instanceof ProtoMsg.Message)) {
super.channelRead(ctx, msg);
return;
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
// 取得请求类型
ProtoMsg.HeadType headType = pkg.getType();
if (!headType.equals(loginProcessor.type())) {
super.channelRead(ctx, msg);
return;
// 生成ServerSession
ServerSession session = new ServerSession(ctx.channel());
//异步任务,处理登录的逻辑
CallbackTaskScheduler.add(new CallbackTask<Boolean>() {
@Override
public Boolean execute() throws Exception {
// 调用登录逻辑
return loginProcessor.action(session, pkg);
//异步任务返回
@Override
public void onSuccess(Boolean r) {
if (r) {
// 登录成功,移除LoginRequestHandler
ctx.pipeline().remove(LoginRequestHandler.this);
log.info("登录成功:" + session.getUser());
} else {
// 登录失败,关闭连接、移除ServerSession
ServerSession.closeSession(ctx);
log.info("登录失败:" + session.getUser());
//异步任务异常
@Override
public void onFailure(Throwable t) {
// 登录异常,关闭连接、移除ServerSession
ServerSession.closeSession(ctx);
log.info("登录失败:" + session.getUser());

ServerExceptionHandler

  处理异常的Handler:

@Slf4j
@ChannelHandler.Sharable
@Service
public class ServerExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 如果是编、解码异常则关闭连接
if (cause instanceof InvalidFrameException) {
log.error(cause.getMessage());
ServerSession.closeSession(ctx);
} else {
// 捕捉异常信息
log.error(cause.getMessage());
ctx.close();
* 通道 Read 读取 Complete 完成
* 做刷新操作 ctx.flush()
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ServerSession.closeSession(ctx);

HeartBeatServerHandler

  用来处理心跳消息:

@Slf4j
public class HeartBeatServerHandler extends IdleStateHandler {
private static final int READ_IDLE_GAP = 150;
public HeartBeatServerHandler() {
super(READ_IDLE_GAP, 0, 0, TimeUnit.SECONDS);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断消息实例
if (null == msg || !(msg instanceof ProtoMsg.Message)) {
super.channelRead(ctx, msg);
return;
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
// 判断消息类型
ProtoMsg.HeadType headType = pkg.getType();
if (headType.equals(ProtoMsg.HeadType.KEEPALIVE_REQUEST)) {
// 异步处理,将心跳包改成KEEPALIVE_RESPONSE的消息类型 ,回复给客户端
FutureTaskScheduler.add(() -> {
if (ctx.channel().isActive()) {
log.info("收到 HEART_BEAT 消息 from client");
ProtoMsg.Message keepaliveResponse = HeartBeatMsgBuilder.buildKeepAliveResponse(pkg.getSequence(), pkg.getHeartBeat());
ctx.writeAndFlush(keepaliveResponse);
// 如果不调用,IdleStateHandler的入站空闲检测将会调用不到
super.channelRead(ctx, msg);
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
System.out.println(READ_IDLE_GAP + "秒内未读到数据,关闭连接");
ServerSession.closeSession(ctx);

  基础模块完成的差不多了,现在将它们通过Netty、以及SpringBoot给组装起来。

ChatServer

  Netty服务端实现,首先添加配置文件application.yml:

server:
port: 8080
@Slf4j
@Service("ChatServer")
public class ChatServer {
// 服务器端口
@Value("${server.port}")
private int port;
@Autowired
private LoginRequestHandler loginRequestHandler;
@Autowired
private ServerExceptionHandler serverExceptionHandler;
public void run() {
// 启动引导器
ServerBootstrap bootstrap = new ServerBootstrap();
// 采用NioEventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
// 1 设置reactor 线程组
bootstrap.group(bossGroup, workGroup);
// 2 设置nio类型的channel
bootstrap.channel(NioServerSocketChannel.class);
// 3 设置监听端口
bootstrap.localAddress(new InetSocketAddress(port));
// 4 设置通道选项
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
// 5 装配流水线
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
// 有连接到达时会创建一个channel
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 管理pipeline中的Handler
ch.pipeline().addLast(new ProtoBufDecoder());
ch.pipeline().addLast(new ProtoBufEncoder());
ch.pipeline().addLast(new HeartBeatServerHandler());
// 在流水线中添加handler来处理登录,登录后删除
ch.pipeline().addLast(loginRequestHandler);
ch.pipeline().addLast(serverExceptionHandler);
* 6 开始绑定server
* 通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = bootstrap.bind().sync();
log.info("疯狂创客圈 CrazyIM 服务启动, 端口 " + channelFuture.channel().localAddress());
* 7 监听通道关闭事件
* 应用程序会一直等待,直到channel关闭
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
* 8 优雅关闭EventLoopGroup
* 释放掉所有资源包括创建的线程
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();

ServerApplication

SpringBoot启动类:

@SpringBootApplication
public class ServerApplication {
public static void main(String[] args) {
// 启动并初始化 Spring 环境及其各 Spring 组件
ApplicationContext context = SpringApplication.run(ServerApplication.class, args);
ChatServer nettyServer = context.getBean(ChatServer.class);
nettyServer.run();

  至此服务端的登录功能完成了,启动ServerApplication,成功日志如下:

2021-03-20 16:08:27.981 INFO 48628 --- [ main] cn.didadu.chatserver.ServerApplication : Starting ServerApplication using Java 1.8.0_202 on zhangjingdeMacBook-Pro.local with PID 48628 (/Users/zhangjing/IdeaProjects/crazyIM/chatserver/target/classes started by zhangjing in /Users/zhangjing/IdeaProjects/crazyIM)
2021-03-20 16:08:27.987 INFO 48628 --- [ main] cn.didadu.chatserver.ServerApplication : No active profile set, falling back to default profiles: default
2021-03-20 16:08:29.186 INFO 48628 --- [ main] cn.didadu.chatserver.ServerApplication : Started ServerApplication in 2.142 seconds (JVM running for 3.03)
2021-03-20 16:08:29.550 INFO 48628 --- [ main] cn.didadu.chatserver.server.ChatServer : 疯狂创客圈 CrazyIM 服务启动, 端口 /0:0:0:0:0:0:0:0:8080

下面将实现客户端的登录功能,届时再进行调试。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK