119

Thrift(Java版)到网络编程(三)—异步和NIO

 6 years ago
source link: https://zhuanlan.zhihu.com/p/31110044?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Thrift(Java版)到网络编程(三)—异步和NIO

前两篇啰嗦了很多,但也都是些基础玩法。作者也非常成功的绕过了非阻塞和异步这两个有点让人头疼的概念。写到第三篇,发现已经没法再回避了。所以,本文中作者试图以猿类最朴素的语言把这几个概念讲明白,并结合Thrift相关实现给大家一个更贴近实战的认知。

本文讨论的同步和异步是针对通信模式来说的,而阻塞和非阻塞是对IO(更多是网络IO)来说。当然IO也有同步和异步(AIO)的概念。因为篇幅和认知有限,本文估计很少谈到异步IO(本文讨论范畴基本都是同步IO)。

我们一般讲异步的时候,更多是站在调用方角度说的。最简单的一个例子就是老板把一个任务交给你,他自己继续看报纸。他可能过一会儿就过来问你一下是否已经完成任务了,一直到你完成为止。当然更多时候等你做完了去告诉他。而同步则是老板问你今天写了几个bug,在你回答之前老板一直盯着你。

一个简单的异步的例子:

public class AsyncTestSimple {
  static Callable<Integer> task1 = new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
      MyLogger.println("slave is mining...");
      Thread.sleep(1000);
      return Integer.valueOf(1);
    }
  };
  public static void main(String args[]) {
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    Future<Integer> future1 = executorService.submit(task1);
    MyLogger.println(future1.isDone());
    MyLogger.println("Boss is counting money...");
    Integer result1 = Integer.valueOf(0);
    try {
      result1 = future1.get();
    } catch (Exception e) {
      e.printStackTrace();
    }
    MyLogger.println(future1.isDone());
    MyLogger.println("get new coins: " + (result1));
    executorService.shutdown();
  }
}

综上,当一个任务比较耗且你又不需要立等结果时,可以考虑引入异步了。当然,异步也并非意味着一定需要返回结果。比如你把一个任务扔到MQ,后续处理你又不关心,这个也是可以的。

异步本身无法提高任何运算效率。但是目前计算机都是多核的,所以如果瓶颈是计算资源的时候,那么异步就可以通过多线程(多机),更好的利用多核资源了。

但如果瓶颈是IO,那么异步的意义有多大呢?别急,这时候一定要引入另一个概念,他就是非阻塞IO。

非阻塞IO

网络通信,很多时候瓶颈都会在网络IO上:可能是带宽,可能是连接资源(一般是FD,阻塞模式下还有对应的是线程资源)。前人根据实战经验,总结出了主要的五种IO模型(Linux下)。这里我就不再展开了。推荐此文,写得不错哦。看完上面这一篇,我觉得应付面试应该问题不大了。从别处盗个图如下:

非阻塞(nonblocking)这个词其实是非常具有混淆性。不管上面说的非阻塞还是IO多路复用,本质上真正的IO操作其实都是阻塞的。唯一区别是在数据准备好之前nonblocking实际上是不断去check,所以这部分时间可以认为是没有阻塞。而IO多路复用则是全程阻塞。只是check阶段的阻塞是多个连接可以阻塞在单个线程上,由这一个线程去check多个连接事件,从而节省了线程资源。

有人说NIO是Nonblocking IO的缩写,这是不对的。NIO其实是NewIO(1.4版本引入),用以跟之前的版本的java IO库区分(因为NIO也可以工作在阻塞模式)。另外,NIO在linux上实际上是基于IO多路复用(epoll)机制实现的,而不是简单的nonblocking模型(以上个人观点,欢迎反驳)。当然其主要卖点确实是非阻塞,所以这个问题也不是很大。

既然是New IO,自然是有一些新的概念。不过想把NIO全部说清楚估计得写本书。有一篇老外写的,简单明了,大家可以看看。对NIO还没有太多概念的同学建议你还是先看看上面这一篇。

本文想讨论了都是网络编程范围的,所以先看一个基于NIO的网络通信的栗子。Server端:

public class NioStringServer extends Thread {
  private InetSocketAddress inetSocketAddress;
  private ServerHandler handler = new ServerHandler();
  public NioStringServer(String hostname, int port) {
    inetSocketAddress = new InetSocketAddress(hostname, port);
  }
  @Override
  public void run() {
    try {
      Selector selector = Selector.open(); // 打开选择器
      ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打开通道
      //如果设置为阻塞模式,当register selector时会抛IllegalBlockingModeException异常
      serverSocketChannel.configureBlocking(false);
      serverSocketChannel.socket().bind(inetSocketAddress);
      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 向通道注册选择器和对应事件标识
      System.out.println("Server: socket server started.");
      while (true) {
        int nKeys = selector.select();
        if (nKeys > 0) {
          Set<SelectionKey> selectedKeys = selector.selectedKeys();
          Iterator<SelectionKey> it = selectedKeys.iterator();
          while (it.hasNext()) {
            SelectionKey key = it.next();
            if (key.isAcceptable()) {
              System.out.println("Server: SelectionKey is acceptable.");
              handler.handleAccept(key);
            } else if (key.isReadable()) {
              System.out.println("Server: SelectionKey is readable.");
              handler.handleRead(key);
            } else if (key.isWritable()) {
              System.out.println("Server: SelectionKey is writable.");
              handler.handleWrite(key);
            }
            it.remove();
          }
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  class ServerHandler {
    public void handleAccept(SelectionKey key) throws IOException {
      ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
      SocketChannel socketChannel = serverSocketChannel.accept();
      System.out.println("Server: accept client socket " + socketChannel);
      socketChannel.configureBlocking(false);
      socketChannel.register(key.selector(), SelectionKey.OP_READ);
    }
    public void handleRead(SelectionKey key) throws IOException {
      ByteBuffer byteBuffer = ByteBuffer.allocate(512);
      SocketChannel socketChannel = (SocketChannel) key.channel();
      while (true) {
        int readBytes = socketChannel.read(byteBuffer);
        if (readBytes > 0) {
          System.out.println("Server: readBytes = " + readBytes);
          System.out.println("Server: data = " + new String(byteBuffer.array(), 0, readBytes));
          byteBuffer.put(" ~ ~ ~".getBytes());
          byteBuffer.flip();
          try {
            Thread.sleep(10000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
          socketChannel.write(byteBuffer);
          break;
        }
      }
      socketChannel.close();
    }
    public void handleWrite(SelectionKey key) throws IOException {
      ByteBuffer byteBuffer = (ByteBuffer) key.attachment();
      byteBuffer.flip();
      SocketChannel socketChannel = (SocketChannel) key.channel();
      socketChannel.write(byteBuffer);
      if (byteBuffer.hasRemaining()) {
        key.interestOps(SelectionKey.OP_READ);
      }
      byteBuffer.compact();
    }
  }

  public static void main(String[] args) {
    NioStringServer server = new NioStringServer("127.0.0.1", 9002);
    server.start();
  }
}

代码稍微有点长,这也是非阻塞模式带来的额外成本(这也为Netty等框架提供了一个炫技术的机会,本文继续focus在thrift)。这里主要有四个概念: Selector, ServerSocketChannel, SelectionKey和ByteBuffer。

大致流程是:

  1. Open一个Selector(selector的请参考前面讨论的IO多路复用里的selector机制);
  2. 新建一个ServerSocketChannel ,并使它处于非阻塞模式下(如果是阻塞模式下,后续注册selector会抛异常);
  3. 将selector和对应的事件,注册到该channel,监听channel上的事件(理论上是可以多个事件,但是ServerSocketChannel只支持OP_ACCEPT);
  4. 当事件到来之前当前线程会阻塞在select操作上;
  5. 当一个事件到来时,通过selector.selectedKeys()方法可以获取该事件对应的SelectionKeys.可以把SelectionKey理解为一种事件和通道之间的关联关系或通道上的事件的token。有四种SelectionKey: connect, accept, read和write。这里就不解释了,顾名思义。通过key我们可以获取其关联的channel。高级一点还可以附带一个object。
  6. 当得到channel对应的事件之后,我们就可以对此channel进行读写操作了。当然读写一般来说都是通过buffer进行的。本例用的是个ByteBuffer。
  7. 好像也不复杂。再看看client代码:
public class NioStringClient {
  private InetSocketAddress inetSocketAddress;
  public NioStringClient(String hostname, int port) {
    inetSocketAddress = new InetSocketAddress(hostname, port);
  }
  public void send(String requestData) {
    try {
      SocketChannel socketChannel = SocketChannel.open(inetSocketAddress);
      socketChannel.configureBlocking(false);
      ByteBuffer byteBuffer = ByteBuffer.allocate(512);
      socketChannel.write(ByteBuffer.wrap(requestData.getBytes())); //write
      while (true) {
        byteBuffer.clear();
        int readBytes = socketChannel.read(byteBuffer);
        if (readBytes > 0) {
          byteBuffer.flip();
          System.out.println("Client: readBytes = " + readBytes);
          System.out.println("Client: data = " + new String(byteBuffer.array(), 0, readBytes));
          socketChannel.close();
          break;
        }
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
  public static void main(String[] args) {
    String hostname = "localhost";
    String requestData = "Actions speak louder than words!";
    int port = 9002;
    for (int i = 0; i < 100; i++) {
      Runnable task = new Runnable() {
        @Override
        public void run() {
          new NioStringClient(hostname, port).send(requestData);
        }
      };
      Thread taskThread = new Thread(task);
      taskThread.start();
    }
  }
}

Client代码相对简单很多。主要做的就是write一个message,然后在死循环read服务端写回消息。当然这个read是非阻塞的,所以需要判断其返回值来判断是否读到了数据。代码只用于演示,大家参考看看就好。

从该例子看,运行结果等跟同步的阻塞模式没什么两样。

那我们为什么要用非阻塞呢?我们用非阻塞模式,主要是为了解决阻塞方式中每个连接都需要分配个线程去处理的问题。因为线程是昂贵的:

1)一般认为一个线程什么都不干也会占用1MB以上内存。所以当客户端到服务端连接数超过100W时,不考虑JVM自身限制,一般物理机也很难hold得住了。用线程池模式时,我们一般也就开个几百几千的core线程。当连接数多于线程池上限(加上backlog)时,很可能就连不上了(可以参考本系列第一篇文章)。

2)线程间频繁切换还会占用CPU资源。

常见考虑引入非阻塞IO的场景如客户端到服务端长连接情况(比如游戏、push类、IM类客户端)。所以当客户端到服务端保持很多连接,但是连接对应的IO又没那么繁忙时,非阻塞模式就可以大显身手了。

上面的例子,服务端在handelRead里sleep10s,模拟处理时间。执行结果跟预期一样,没有出现连接拒绝情况。Netstat看一下,10个连接都处于established状态,没毛病。因为我们也没有超时机制,客户端每个10s也能拿到正常结果。

Thrift Server线程模型

前面的栗子有点繁琐,而且如果用到生产环境还有一堆问题。自己写基于NIO的服务,很多时候都是不太明智的,因为市面上已经有Netty、Thrift等成熟的框架,帮大家做了很多事情,能够减少绝大部分工作量,还没不容易犯错。

本文继续基于Thrift讨论。为了能够更好的理解Thrift对NIO非阻塞的支持,下面从服务端线程模型为切入点,整体对比一下Thrift几个Server实现。

我们首先回顾一下前面讨论过的TSimpleServer和TThreadPoolServer。简单画了示意图如下(本来想从网上盗个图,实在找不到,只能自己动手了)。

1. TSimpleServer主要是演示用,所以只有一个线程,把所有事情都干了,而且工作在阻塞模式下。所以当有一个请求在处理时,再来个请求,就会发生连接拒绝了(本后后续讨论都假设backlog是0)。

2. 为了解决TSimpleServer并发能力上的不足,引入了TThreadPoolServer。他对于每个连接都从线程池分配一个线程去处理。从而提高了并发处理能力。当线程数不是很大时(比如10K以内),该模型的效率是最高的。他的问题前面也说过了,当连接数较多时,线程数大小就会变成瓶颈。

3. 为了解决阻塞模式带来的问题,引入了TNonblockingServer。他的实现其实跟作者上面贴的例子很相似:

通过引入NIO及其多路复用能力,解决了连接相关问题。此时连接数一般不会成为系统瓶颈。对于单核且服务端没有其他IO阻塞情况时,该模型可以考虑使用。但这种场景实在是太少了,基本也用不上。

该模型最大的问题也很明显,select,io读写和worker都在一个线程内处理,其并发能力跟TSimpleServer没有任何提升。

4. 我们很自然的想到增加worker线程池,这就是THsHaServer。他跟TThreadPoolServer的最大区别是,当一个连接过来时,建立连接和io读写都不再是由worker线程处理,而是由一个selector线程处理完后再把任务交给工作线程并发处理。而select过程是非阻塞的。

从并发处理能力上,该模型已经接近TThreadPoolServer了,并且不存在连接数问题。

5. 有些场景IO读写本身也会耗费大量CPU,从而使得THsHaServer的select线程会变成性能瓶颈。此时Thrift终于把大杀器搬出来了。他就是TThreadedSelectorServer。他也是目前thrift最高级的Server模型了。相对于THsHaServer,他增加了一个selector线程池来处理IO读写。一个Accept线程专门负责select,当IO事件到来时把它转发给相应selector线程。Selector线程负责从对应的channel读取数据,再把他交给worker线程池处理。待worker处理完后,selector再把相应的response写回相应的channel。

经验上selector线程数跟worker线程数不需要一致,大部分时候io线程都会比worker少。

不同Server端模型之间的对比,大家可以参考此文。总体来说,如果连接数不是很多,TThreadPoolServer性能最优。如果连接数很多,或者不知道该用哪个就可以用TThreadedSelectorServer。贴个性能模拟对比结果:

TThreadedSelectorServer

简单看一个Server端的例子,可配置的参数主要是那两个线程池的大小。

public class ServerRunnerSimpleAsync {
  public static Hello.AsyncIface handler;
  public static Hello.AsyncProcessor processor;
  public static void main(String[] args) {
    System.out.println("server stating ...");
    try {
      handler = new HelloHandlerAsync();
      processor = new Hello.AsyncProcessor<>(handler);
      Runnable server = new Runnable() {
        public void run() {
          threadedSelectorServer(processor);
        }
      };
      new Thread(server).start();
    } catch (Exception x) {
      x.printStackTrace();
    }
  }
  private static void threadedSelectorServer(Hello.AsyncProcessor processor){
    try {
      TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(8406);
      TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(serverTransport)
          .processor(processor)
          .transportFactory(new TFramedTransport.Factory())
          .protocolFactory(new TBinaryProtocol.Factory())
          .selectorThreads(2)
          .workerThreads(10);
      TServer server = new TThreadedSelectorServer(args);
      server.serve();
    } catch (Exception e){
      e.printStackTrace();
    }
  }
}

这里请注意,对于非阻塞的server一定要用TFramedTransport。相应的client也必须用TFramedTransport。

其余代码看了下,好像也没什么好解释的了,跟前面讨论的TThreadPoolServer等没太多区别(前面讲的太到位了哈哈)。当然如果细看实现,这里面还有路由策略,队列等还可以继续研究。

客户端调用,可以是同步调用,也可以是异步调用(注意:异步客户端没法调用阻塞的server api,这是Thrift的限制)。看个异步的例子:

public class ThriftClientDemo {
  public static void main(String[] args) {
      Hello.AsyncClient client = new Hello.AsyncClient(
          new TBinaryProtocol.Factory(),
          new TAsyncClientManager(),
          new TNonblockingSocket("localhost", 8406)
      );
      client.sayHello("Eric", 18, new HelloCallbackHandler());
    } catch (Exception e) {
      e.printStackTrace();
    }
    try {
      Thread.sleep(50000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    return;
  }
}

这里跟同步调用最大的区别是多了一个回调处理:

public class HelloCallbackHandler implements AsyncMethodCallback<Hello.AsyncClient.sayHello_call> {
  public void onComplete(Hello.AsyncClient.sayHello_call response) {
    try {
      MyLogger.println(new Date() + ": " + response.getResult().name + " @ " + response.getResult().address);
    } catch (TException e) {
      e.printStackTrace();
    }
  }
  public void onError(Exception exception) {
    System.out.println("error: " + exception);
  }
}

了解异步的同学,这个也不陌生。就不展开了。而异步的好处前面已经讨论过了。在thrift框架下,如果想异步调用,必须采用非阻塞方式。

单纯的用NIO的client进行同步调用,作者还没有想到那种场景有非常大的实际意义。异步+NIO之后,当前客户端线程可以在注册回调处理函数后,就可以退出或继续处理其他任务了。待IO
ready后继续处理当前任务。这就跟服务端侧的意义就差不多了。通过NIO把连接和处理线程进行了解耦,不再是一一对应了。

拖拖拉拉写了三篇。不管写出来质量如何,作者更享受整理的过程。

一定有很多地方理解不到位或描述不够明确,不正确的。欢迎大家一起讨论、批评。

“大道至简”--前任老大的一次分享主题,深有感触。简单意味着容易上手(不怕人员迭代,对成员要求也没那么搞),容易维护(统一和自动化运维更加可行),容易扩展(迭代速度更快)。做技术的同学总是会有点技术追求,男人又总是喜欢新鲜感。所以经常碰到一些同学把新鲜,晦涩难懂等价为高大上。个人认为适合的才是最好的(废话…)。篇幅原因这里就不再扩展了,有机会作者也想吹一吹架构那套西风。

Thrift还是很轻量级的,相信绝大多数同学花上一小段时间就差不多可以上手了。所以在做服务化尝试的同学可以考虑试用。他虽然他更新已经没那么频繁了,这可能也是他比较稳定了的缘故。当然轻量意味着很多事情他都没做。有得必有失,最终决策还是要看业务和团队当前状态吧。我们在一些中大型工程里实际使用了thrift,目前为止系统稳定性、性能、多语言支持等方面的表现都还是不错的。

写在19大后,帝都。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK