3

EventMesh源码解析系列(一)之HTTP Server实现

 2 years ago
source link: https://my.oschina.net/webank/blog/5283714
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

EventMesh源码解析系列(一)之HTTP Server实现

HTTP Server实现

首先我们先讲一下HTTP Server启动前的准备工作,也就是HTTP Server的初始化。

1.在初始化的时候,会初始化多个线程池,这个线程池中会有许多的阻塞的线程队列,当某一种事件发生的时候,就会根据事件来使用线程池工厂来为事件创建线程。

【源代码片段】

public void initThreadPool() throws Exception {        // 批处理消息        BlockingQueue<Runnable> batchMsgThreadPoolQueue = new LinkedBlockingQueue<Runnable>(eventMeshHttpConfiguration.eventMeshServerBatchBlockQSize);        batchMsgExecutor = ThreadPoolFactory.createThreadPoolExecutor(eventMeshHttpConfiguration.eventMeshServerBatchMsgThreadNum,                eventMeshHttpConfiguration.eventMeshServerBatchMsgThreadNum, batchMsgThreadPoolQueue, "eventMesh-batchMsg-", true);        // 发送消息,和上面的类似        BlockingQueue<Runnable> sendMsgThreadPoolQueue=...        // 推送消息,和上面的类似        BlockingQueue<Runnable> pushMsgThreadPoolQueue =...        // 客户端管理,和上面的类似        BlockingQueue<Runnable> clientManageThreadPoolQueue =...        // 管理线程池,和上面的类似        BlockingQueue<Runnable> adminThreadPoolQueue =...        // 回复消息,和上面的类似        BlockingQueue<Runnable> replyMessageThreadPoolQueue =...    }

2.注册http请求处理器,这个处理器会针对http发过来的请求,获取到请求码,根据请求码注册对应的处理器,并且为这个事件用上面的线程池分配一个线程进行处理。

【源代码片段】

 public void registerHTTPRequestProcessor() {       // 新建批量消息处理器        BatchSendMessageProcessor batchSendMessageProcessor = new BatchSendMessageProcessor(this);       // 获取请求码,并且分配一个线程进行处理        registerProcessor(RequestCode.MSG_BATCH_SEND.getRequestCode(),                batchSendMessageProcessor, batchMsgExecutor);        BatchSendMessageV2Processor batchSendMessageV2Processor =...        // 同步消息处理器,和上面的类似        SendSyncMessageProcessor sendSyncMessageProcessor =...        // 异步消息处理器,和上面的类似        SendAsyncMessageProcessor sendAsyncMessageProcessor =...        // 管理指标处理器,和上面的类似        AdminMetricsProcessor adminMetricsProcessor =...        // 心跳处理器,和上面的类似        HeartBeatProcessor heartProcessor =...        // 订阅处理器,和上面的类似        SubscribeProcessor subscribeProcessor =...        // 和上面的类似        UnSubscribeProcessor unSubscribeProcessor =...        // 回复消息处理器,和上面的类似        ReplyMessageProcessor replyMessageProcessor =...    }

3.这里就是http初始化的全部代码。

public class EventMeshHTTPServer extends AbstractHTTPServer {  ...    //初始化    public void init() throws Exception {        logger.info("==================EventMeshHTTPServer Initialing==================");        // 初始化线程组        super.init("eventMesh-http");        // 初始化线程池        initThreadPool();        // 对指标初始化,主要是把生成的指标用于后台数据处理        metrics = new HTTPMetricsServer(this);        metrics.init();        // 消费者管理初始化,主要是把httpServer注册到事件总线上        consumerManager = new ConsumerManager(this);        consumerManager.init();        producerManager = new ProducerManager(this);        producerManager.init();        // 重试        httpRetryer = new HttpRetryer(this);        httpRetryer.init();        // 注册http处理器        registerHTTPRequestProcessor();
        logger.info("--------------------------EventMeshHTTPServer inited");    }    ...}

初始化完成之后,再启动http的服务器端,这里我也同样聊聊以下几点。

1.AbstractHTTPServer的启动,采用的是netty的异步模型框架搭建的。具体来讲,这里创建了两个线程池:bossGroup和workerGroup,前者是用来轮询accept事件并且和client建立连接的,后者是用来轮询read和write事件并且使用handlers处理io事件的。而且这里采用了回调机制,当调用发出后,并不一定立刻就能得到结果,而是在实际处理的时候调用这个组件完成后,通过状态、通知等回调告知调用者。

public abstract class AbstractHTTPServer extends AbstractRemotingServer {  ...    @Override    public void start() throws Exception {        super.start();        Runnable r = () -> {            // 创建服务器端启动的对象            ServerBootstrap b = new ServerBootstrap();            // 不进行加密通话?            SSLContext sslContext = useTLS ? SSLContextFactory.getSslContext() : null;            b.group(this.bossGroup, this.workerGroup)// 设置两个线程组                    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器的通道实现                    .childHandler(new HttpsServerInitializer(sslContext))// 设置workerGroup的管道处理器                    .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);// 保持连接状态            try {                httpServerLogger.info("HTTPServer[port={}] started......", this.port);                // 这里就是对http的端口进行绑定,并且启动服务器端,采用了回调机制                ChannelFuture future = b.bind(this.port).sync();                //关闭通道事件进行监听                future.channel().closeFuture().sync();            } catch (Exception e) {                httpServerLogger.error("HTTPServer start Err!", e);                try {                    // 关闭资源                    shutdown();                } catch (Exception e1) {                    httpServerLogger.error("HTTPServer shutdown Err!", e);                }                return;            }        };
        Thread t = new Thread(r, "eventMesh-http-server");        t.start();        started.compareAndSet(false, true);    }
   ...}

2.设置管道处理器部分,当channel被注册之后,这个类中的initChannel方法就会被调用,也会执行在管道后面加入的handlers。

​​​​​​​

 class HttpsServerInitializer extends ChannelInitializer<SocketChannel> {
        private SSLContext sslContext;
        public HttpsServerInitializer(SSLContext sslContext) {            this.sslContext = sslContext;        }
        @Override        protected void initChannel(SocketChannel channel) throws Exception {            ChannelPipeline pipeline = channel.pipeline();

            if (sslContext != null && useTLS) {                SSLEngine sslEngine = sslContext.createSSLEngine();                sslEngine.setUseClientMode(false);                pipeline.addFirst("ssl", new SslHandler(sslEngine));            }            // 在管道后面加入handlers            pipeline.addLast(new HttpRequestDecoder(),// 这个是对http解码                    new HttpResponseEncoder(),// 这个是对http的响应编码                    new HttpConnectionHandler(),// 这个是对http连接处理                    new HttpObjectAggregator(Integer.MAX_VALUE),// 这个是http对象聚合                    new HTTPHandler());// 这个是http的Handler的具体实现        }    }}

3.这里附上start的源码部分。

public class EventMeshHTTPServer extends AbstractHTTPServer {      ...    public void start() throws Exception {        super.start();        // 指标        metrics.start();        // 消费者管理        consumerManager.start();        // 生产者管理        producerManager.start();        // 重试        httpRetryer.start();        logger.info("--------------------------EventMeshHTTPServer started");    }    ...}

到此,EventMesh的HTTP Server实现部分源码解析就结束了。

注:本文内容由社区小伙伴陈创慧提供,原文地址:https://blog.csdn.net/CodePlayMe/article/details/120671622


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK