1

浅谈Thin的事件驱动模型

 5 months ago
source link: https://www.ttalk.im/2023/12/ruby-thin-server.html?amp%3Butm_medium=Atom
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

一篇很好的介绍Ruby的Thin的服务器的博文,转发分享

在上一篇文章中我们已经介绍了WEBrick的实现,它的 handler 是写在 Rack 工程中的,而在这篇文章介绍的 webserver thin 的 Rack 处理器也是写在 Rack 中的;与 WEBrick 相同,Thin 的实现也非常简单,官方对它的介绍是:

A very fast & simple Ruby web server.

它将 Mongrel、EventMachine 和 Rack 三者进行组合,在其中起到胶水的作用,所以在理解 Thin 的实现的过程中我们也需要分析 EventMachine 到底是如何工作的。

Thin 的实现

在这一节中我们将从源代码来分析介绍 Thin 的实现原理,因为部分代码仍然是在 Rack 工程中实现的,所以我们要从 Rack 工程的代码开始理解 Thin 的实现。

从 Rack 开始

Thin 的处理器 Rack::Handler::Thin 与其他遵循 Rack 协议的 webserver 一样都实现了 .run 方法,接受 Rack 应用和 options 作为输入

module Rack
  module Handler
    class Thin
      def self.run(app, options={})
        environment  = ENV['RACK_ENV'] || 'development'
        default_host = environment == 'development' ? 'localhost' : '0.0.0.0'

        host = options.delete(:Host) || default_host
        port = options.delete(:Port) || 8080
        args = [host, port, app, options]
        args.pop if ::Thin::VERSION::MAJOR < 1 && ::Thin::VERSION::MINOR < 8
        server = ::Thin::Server.new(*args)
        yield server if block_given?
        server.start
      end
    end
  end
end

上述方法仍然会从 options 中取出 ip 地址和端口号,然后初始化一个 Thin::Server 的实例后,执行 #start 方法在 8080 端口上监听来自用户的请求。

初始化服务

Thin 服务的初始化由以下的代码来处理,首先会处理在 Rack::Handler::Thin.run 中传入的几个参数 host、port、app 和 options,将 Rack 应用存储在临时变量中:

From: lib/thin/server.rb @ line 100:
Owner: Thin::Server

def initialize(*args, &block)
  host, port, options = DEFAULT_HOST, DEFAULT_PORT, {}

  args.each do |arg|
    case arg
    when 0.class, /^\d+$/ then port    = arg.to_i
    when String           then host    = arg
    when Hash             then options = arg
    else
      @app = arg if arg.respond_to?(:call)
    end
  end

  @backend = select_backend(host, port, options)
  @backend.server = self
  @backend.maximum_connections            = DEFAULT_MAXIMUM_CONNECTIONS
  @backend.maximum_persistent_connections = DEFAULT_MAXIMUM_PERSISTENT_CONNECTIONS
  @backend.timeout                        = options[:timeout] || DEFAULT_TIMEOUT

  @app = Rack::Builder.new(&block).to_app if block
end

在初始化服务的过程中,总共只做了三件事情,处理参数、选择并配置 backend,创建新的应用:

ruby-thin-server-1.png

处理参数的过程自然不用多说,只是这里判断的方式并不是按照顺序处理的,而是按照参数的类型;在初始化器的最后,如果向初始化器传入了 block,那么就会使用 Rack::Builder 和 block 中的代码初始化一个新的 Rack 应用。

在选择后端时 Thin 使用了#select_backend 方法,这里使用 case 语句替代多个 if、else,也是一个我们可以使用的小技巧:

From: lib/thin/server.rb @ line 261:
Owner: Thin::Server

def select_backend(host, port, options)
  case
  when options.has_key?(:backend)
    raise ArgumentError, ":backend must be a class" unless options[:backend].is_a?(Class)
    options[:backend].new(host, port, options)
  when options.has_key?(:swiftiply)
    Backends::SwiftiplyClient.new(host, port, options)
  when host.include?('/')
    Backends::UnixServer.new(host)
  else
    Backends::TcpServer.new(host, port)
  end
end

在大多数时候,我们只会选择 UnixServer 和 TcpServer 两种后端中的一个,而后者又是两者中使用更为频繁的后端:

From: lib/thin/backends/tcp_server.rb @ line 8:
Owner: Thin::Backends::TcpServer

def initialize(host, port)
  @host = host
  @port = port
  super()
end

From: lib/thin/backends/base.rb @ line 47:
Owner: Thin::Backends::Base

def initialize
  @connections                    = {}
  @timeout                        = Server::DEFAULT_TIMEOUT
  @persistent_connection_count    = 0
  @maximum_connections            = Server::DEFAULT_MAXIMUM_CONNECTIONS
  @maximum_persistent_connections = Server::DEFAULT_MAXIMUM_PERSISTENT_CONNECTIONS
  @no_epoll                       = false
  @ssl                            = nil
  @threaded                       = nil
  @started_reactor                = false
end

初始化的过程中只是对属性设置默认值,比如 host、port 以及超时时间等等,并没有太多值得注意的代码。

在启动服务时会直接调用 TcpServer#start 方法并在其中传入一个用于处理信号的 block:

From: lib/thin/server.rb @ line 152:
Owner: Thin::Server

def start
  raise ArgumentError, 'app required' unless @app
  
  log_info  "Thin web server (v#{VERSION::STRING} codename #{VERSION::CODENAME})"
  log_debug "Debugging ON"
  trace     "Tracing ON"
  
  log_info "Maximum connections set to #{@backend.maximum_connections}"
  log_info "Listening on #{@backend}, CTRL+C to stop"

  @backend.start { setup_signals if @setup_signals }
end

虽然这里的 backend 其实已经被选择成了 TcpServer,但是该子类并没有覆写#start 方法,这里执行的方法其实是从父类继承的:

From: lib/thin/backends/base.rb @ line 60:
Owner: Thin::Backends::Base

def start
  @stopping = false
  starter   = proc do
    connect
    yield if block_given?
    @running = true
  end
  
  # Allow for early run up of eventmachine.
  if EventMachine.reactor_running?
    starter.call
  else
    @started_reactor = true
    EventMachine.run(&starter)
  end
end

上述方法在构建一个 starter block 之后,将该 block 传入 EventMachine.run 方法,随后执行的 #connect 会启动一个 EventMachine 的服务器用于处理用户的网络请求:

From: lib/thin/backends/tcp_server.rb @ line 15:
Owner: Thin::Backends::TcpServer

def connect
  @signature = EventMachine.start_server(@host, @port, Connection, &method(:initialize_connection))
  binary_name = EventMachine.get_sockname( @signature )
  port_name = Socket.unpack_sockaddr_in( binary_name )
  @port = port_name[0]
  @host = port_name[1]
  @signature
end

在 EventMachine 的文档中,.start_server 方法被描述成一个在指定的地址和端口上初始化 TCP 服务的方法,正如这里所展示的,它经常在 .run 方法的 block 中执行;该方法的参数 Connection 作为处理 TCP 请求的类,会实现不同的方法接受各种各样的回调,传入的 initialize_connection block 会在有请求需要处理时对 Connection 对象进行初始化:

Connection 对象继承自 EventMachine::Connection,是 EventMachine 与外界的接口,在 EventMachine 中的大部分事件都会调用 Connection 的一个实例方法来传递数据和参数。

From: lib/thin/backends/base.rb @ line 145:
Owner: Thin::Backends::Base

def initialize_connection(connection)
  connection.backend                 = self
  connection.app                     = @server.app
  connection.comm_inactivity_timeout = @timeout
  connection.threaded                = @threaded
  connection.start_tls(@ssl_options) if @ssl

  if @persistent_connection_count < @maximum_persistent_connections
    connection.can_persist!
    @persistent_connection_count += 1
  end
  @connections[connection.__id__] = connection
end

处理请求的连接

Connection 类中有很多的方法 #post_init、#receive_data 方法等等都是由 EventMachine 在接收到请求时调用的,当 Thin 的服务接收到来自客户端的数据时就会调用 #receive_data 方法:

From: lib/thin/connection.rb @ line 36:
Owner: Thin::Connection

def receive_data(data)
  @idle = false
  trace data
  process if @request.parse(data)
rescue InvalidRequest => e
  log_error("Invalid request", e)
  post_process Response::BAD_REQUEST
end

在这里我们看到了与 WEBrick 在处理来自客户端的原始数据时使用的方法#parse,它会解析客户端请求的原始数据并执行 #process 来处理 HTTP 请求:

From: lib/thin/connection.rb @ line 47:
Owner: Thin::Connection

def process
  if threaded?
    @request.threaded = true
    EventMachine.defer { post_process(pre_process) }
  else
    @request.threaded = false
    post_process(pre_process)
  end
end

如果当前的连接允许并行处理多个用户的请求,那么就会在 EventMachine.defer 的 block 中执行两个方法#pre_process 和#post_process:

From: lib/thin/connection.rb @ line 63:
Owner: Thin::Connection

def pre_process
  @request.remote_address = remote_address
  @request.async_callback = method(:post_process)

  response = AsyncResponse
  catch(:async) do
    response = @app.call(@request.env)
  end
  response
rescue Exception => e
  unexpected_error(e)
  can_persist? && @request.persistent? ? Response::PERSISTENT_ERROR : Response::ERROR
end

在#pre_process 中没有做太多的事情,只是调用了 Rack 应用的 #call 方法,得到一个三元组 response,在这之后将这个数组传入#post_process 方法

From: lib/thin/connection.rb @ line 95:
Owner: Thin::Connection

def post_process(result)
  return unless result
  result = result.to_a
  return if result.first == AsyncResponse.first

  @response.status, @response.headers, @response.body = *result
  @response.each do |chunk|
    send_data chunk
  end
rescue Exception => e
  unexpected_error(e)
  close_connection
ensure
  if @response.body.respond_to?(:callback) && @response.body.respond_to?(:errback)
    @response.body.callback { terminate_request }
    @response.body.errback  { terminate_request }
  else
    terminate_request unless result && result.first == AsyncResponse.first
  end
end

#post_response 方法将传入的数组赋值给 response 的 status、headers 和 body 这三部分,在这之后通过 #send_data 方法将 HTTP 响应以块的形式写回 Socket;写回结束后可能会调用对应的 callback 并关闭持有的 request 和 response 两个实例变量。上述方法中调用的#send_data 继承自 EventMachine::Connection 类。

到此为止,我们对于 Thin 是如何处理来自用户的 HTTP 请求的就比较清楚了,我们可以看到 Thin 本身并没有做一些类似解析 HTTP 数据包以及发送数据的问题,它使用了来自 Rack 和 EventMachine 两个开源框架中很多已有的代码逻辑,确实只做了一些胶水的事情。对于 Rack 是如何工作的我们在前面的文章谈谈Rack协议与实现中已经介绍过了;虽然我们看到了很多与 EventMachine 相关的代码,但是到这里我们仍然对 EventMachine 不是太了解。

EventMachine 和 Reactor 模式

为了更好地理解 Thin 的工作原理,在这里我们会介绍一个 EventMachine 和 Reactor 模式。

EventMachine 其实是一个使用 Ruby 实现的事件驱动的并行框架,它使用 Reactor 模式提供了事件驱动的 IO 模型,如果你对 Node.js 有所了解的话,那么你一定对事件驱动这个词并不陌生,EventMachine 的出现主要是为了解决两个核心问题:

  • 为生产环境提供更高的可伸缩性、更好的性能和稳定性;
  • 为上层提供了一些能够减少高性能的网络编程复杂性的 API;

其实 EventMachine 的主要作用就是将所有同步的 IO 都变成异步的,调度都通过事件来进行,这样用于监听用户请求的进程不会被其他代码阻塞,能够同时为更多的客户端提供服务;在这一节中,我们需要了解一下在 Thin 中使用的 EventMachine 中几个常用方法的实现。

启动事件循环

EventMachine 其实就是一个事件循环(Event Loop),当我们想使用 EventMachine 来处理某些任务时就一定需要调用 .run 方法启动这个事件循环来接受外界触发的各种事件:

From: lib/eventmachine.rb @ line 149:
Owner: #<Class:EventMachine>

def self.run blk=nil, tail=nil, &block
  # ...
  begin
    @reactor_pid = Process.pid
    @reactor_running = true
    initialize_event_machine
    (b = blk || block) and add_timer(0, b)
    if @next_tick_queue && !@next_tick_queue.empty?
      add_timer(0) { signal_loopbreak }
    end
    @reactor_thread = Thread.current

    run_machine
  ensure
    until @tails.empty?
      @tails.pop.call
    end

    release_machine
    cleanup_machine
    @reactor_running = false
    @reactor_thread = nil
  end
end

在这里我们会使用 .initialize_event_machine 初始化当前的事件循环,其实也就是一个全局的 Reactor 的单例,最终会执行 Reactor#initialize_for_run 方法:

From: lib/em/pure_ruby.rb @ line 522:
Owner: EventMachine::Reactor

def initialize_for_run
  @running = false
  @stop_scheduled = false
  @selectables ||= {}; @selectables.clear
  @timers = SortedSet.new # []
  set_timer_quantum(0.1)
  @current_loop_time = Time.now
  @next_heartbeat = @current_loop_time + HeartbeatInterval
end

在启动事件循环的过程中,它还会将传入的 block 与一个 interval 为 0 的键组成键值对存到 @timers 字典中,所有加入的键值对都会在大约 interval 的时间过后执行一次 block。

随后执行的#run_machine 在最后也会执行 Reactor 的#run 方法,该方法中包含一个 loop 语句,也就是我们一直说的事件循环

From: lib/em/pure_ruby.rb @ line 540:
Owner: EventMachine::Reactor

def run
  raise Error.new( "already running" ) if @running
  @running = true

  begin
    open_loopbreaker

    loop {
      @current_loop_time = Time.now

      break if @stop_scheduled
      run_timers
      break if @stop_scheduled
      crank_selectables
      break if @stop_scheduled
      run_heartbeats
    }
  ensure
    close_loopbreaker
    @selectables.each {|k, io| io.close}
    @selectables.clear

    @running = false
  end
end

在启动事件循环之间会在 #open_loopbreaker 中创建一个 LoopbreakReader 的实例绑定在 127.0.0.1 和随机的端口号组成的地址上,然后开始运行事件循环。

ruby-thin-server-2.png

在事件循环中,Reactor 总共需要执行三部分的任务,分别是执行定时器、处理 Socket 上的事件以及运行心跳方法。

无论是运行定时器还是执行心跳方法其实都非常简单,只要与当前时间进行比较,如果到了触发的时间就调用正确的方法或者回调,最后的#crank_selectables 方法就是用于处理 Socket 上读写事件的方法了:

From: lib/em/pure_ruby.rb @ line 540:
Owner: EventMachine::Reactor

def crank_selectables
  readers = @selectables.values.select { |io| io.select_for_reading? }
  writers = @selectables.values.select { |io| io.select_for_writing? }

  s = select(readers, writers, nil, @timer_quantum)

  s and s[1] and s[1].each { |w| w.eventable_write }
  s and s[0] and s[0].each { |r| r.eventable_read }

  @selectables.delete_if {|k,io|
    if io.close_scheduled?
      io.close
      begin
        EventMachine::event_callback io.uuid, ConnectionUnbound, nil
      rescue ConnectionNotBound; end
      true
    end
  }
end

上述方法会在 Socket 变成可读或者可写时执行 #eventable_write 或 #eventable_read 执行事件的回调,我们暂时放下这两个方法,先来了解一下 EventMachine 是如何启动服务的。

在启动服务的过程中,最重要的目的就是创建一个 Socket 并绑定在指定的 ip 和端口上,在实现这个目的的过程中,我们使用了以下的几个方法,首先是 EventMachine.start_server:

From: lib/eventmachine.rb @ line 516:
Owner: #<Class:EventMachine>

def self.start_server server, port=nil, handler=nil, *args, &block
  port = Integer(port)
  klass = klass_from_handler(Connection, handler, *args)

  s = if port
        start_tcp_server server, port
      else
        start_unix_server server
      end
  @acceptors[s] = [klass, args, block]
  s
end

该方法其实使我们在使用 EventMachine 时常见的接口,只要我们想要启动一个新的 TCP 或者 UNIX 服务器,就可以上述方法,在这里会根据端口号是否存在,选择执行 .start_tcp_server 或者 .start_unix_server 创建一个新的 Socket 并存储在 @acceptors 中:

From: lib/em/pure_ruby.rb @ line 184:
Owner: #<Class:EventMachine>

def self.start_tcp_server host, port
  (s = EvmaTCPServer.start_server host, port) or raise "no acceptor"
  s.uuid
end

EventMachine.start_tcp_server 在这里也只做了个[转发]方法的作用的,直接调用 EvmaTCPServer.start_server 创建一个新的 Socket 对象并绑定到传入的 上:

From: lib/em/pure_ruby.rb @ line 1108:
Owner: #<Class:EventMachine::EvmaTCPServer>

def self.start_server host, port
  sd = Socket.new( Socket::AF_LOCAL, Socket::SOCK_STREAM, 0 )
  sd.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true )
  sd.bind( Socket.pack_sockaddr_in( port, host ))
  sd.listen( 50 ) # 5 is what you see in all the books. Ain't enough.
  EvmaTCPServer.new sd
end

方法的最后会创建一个新的 EvmaTCPServer 实例的过程中,我们需要通过#fcntl 将 Socket 变成非阻塞式的:

From: lib/em/pure_ruby.rb @ line 687:
Owner: EventMachine::Selectable

def initialize io
  @io = io
  @uuid = UuidGenerator.generate
  @is_server = false
  @last_activity = Reactor.instance.current_loop_time

  m = @io.fcntl(Fcntl::F_GETFL, 0)
  @io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK | m)

  @close_scheduled = false
  @close_requested = false

  se = self; @io.instance_eval { @my_selectable = se }
  Reactor.instance.add_selectable @io
end

不只是 EvmaTCPServer,所有的 Selectable 子类在初始化的最后都会将新的 Socket 以 uuid 为键存储到 Reactor 单例对象的 @selectables 字典中:

From: lib/em/pure_ruby.rb @ line 532:
Owner: EventMachine::Reactor

def add_selectable io
  @selectables[io.uuid] = io
end

在整个事件循环的大循环中,这里存入的所有 Socket 都会被 #select 方法监听,在响应的事件发生时交给合适的回调处理,作者在Redis中的事件循环一文中也介绍过非常相似的处理过程。

ruby-thin-server-3.png

所有的 Socket 都会存储在一个 @selectables 的哈希中并由#select 方法监听所有的读写事件,一旦相应的事件触发就会通过 eventable_read 或者 eventable_write 方法来响应该事件。

处理读写事件

所有的读写事件都是通过 Selectable 和它的子类来处理的,在 EventMachine 中,总共有以下的几种子类:

ruby-thin-server-4.png

所有处理服务端读写事件的都是 Selectable 的子类,也就是 EvmaTCPServer 和 EvmaUNIXServer,而所有处理客户端读写事件的都是 StreamObject 的子类 EvmaTCPServer 和 EvmaUNIXClient。

当我们初始化的绑定在 上的 Socket 对象监听到了来自用户的 TCP 请求时,当前的 Socket 就会变得可读,事件循环中的#select 方法就会调用 EvmaTCPClient#eventable_read 通知由一个请求需要处理:

From: lib/em/pure_ruby.rb @ line 1130:
Owner: EventMachine::EvmaTCPServer

def eventable_read
  begin
    10.times {
      descriptor, peername = io.accept_nonblock
      sd = EvmaTCPClient.new descriptor
      sd.is_server = true
      EventMachine::event_callback uuid, ConnectionAccepted, sd.uuid
    }
  rescue Errno::EWOULDBLOCK, Errno::EAGAIN
  end
end

在这里会尝试多次#accept_non_block 当前的 Socket 并会创建一个 TCP 的客户端对象 EvmaTCPClient,同时通过 .event_callback 方法发送 ConnectionAccepted 消息。

EventMachine::event_callback 就像是一个用于处理所有事件的中心方法,所有的回调都要通过这个中继器进行调度,在实现上就是一个庞大的 if、else 语句,里面处理了 EventMachine 中可能出现的 10 种状态和操作:

ruby-thin-server-5.png

大多数事件在触发时,都会从 @conns 中取出相应的 Connection 对象,最后执行合适的方法来处理,而这里触发的 ConnectionAccepted 事件是通过以下的代码来处理的:

From: lib/eventmachine.rb @ line 1462:
Owner: #<Class:EventMachine>

def self.event_callback conn_binding, opcode, data
  if opcode == # ...
    # ...
  elsif opcode == ConnectionAccepted
    accep, args, blk = @acceptors[conn_binding]
    raise NoHandlerForAcceptedConnection unless accep
    c = accep.new data, *args
    @conns[data] = c
    blk and blk.call(c)
    c
  else
    # ...
  end
end

上述的 accep 变量就是我们在 Thin 调用 .start_server 时传入的 Connection 类,在这里我们初始化了一个新的实例,同时以 Socket 的 uuid 作为键存到 @conns 中。

在这之后 #select 方法就会监听更多 Socket 上的事件了,当这个 “accept” 后创建的 Socket 接收到数据时,就会触发下面的 #eventable_read 方法:

From: lib/em/pure_ruby.rb @ line 1130:
Owner: EventMachine::StreamObject

def eventable_read
  @last_activity = Reactor.instance.current_loop_time
  begin
    if io.respond_to?(:read_nonblock)
      10.times {
        data = io.read_nonblock(4096)
        EventMachine::event_callback uuid, ConnectionData, data
      }
    else
      data = io.sysread(4096)
      EventMachine::event_callback uuid, ConnectionData, data
    end
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK, SSLConnectionWaitReadable
  rescue Errno::ECONNRESET, Errno::ECONNREFUSED, EOFError, Errno::EPIPE, OpenSSL::SSL::SSLError
    @close_scheduled = true
    EventMachine::event_callback uuid, ConnectionUnbound, nil
  end
end

方法会从 Socket 中读取数据并通过 .event_callback 发送 ConnectionData 事件:

From: lib/eventmachine.rb @ line 1462:
Owner: #<Class:EventMachine>

def self.event_callback conn_binding, opcode, data
  if opcode == # ...
    # ...
  elsif opcode == ConnectionData
    c = @conns[conn_binding] or raise ConnectionNotBound, "received data #{data} for unknown signature: #{conn_binding}"
    c.receive_data data
  else
    # ...
  end
end

从上述方法对 ConnectionData 事件的处理就可以看到通过传入 Socket 的 uuid 和数据,就可以找到上面初始化的 Connection 对象,#receive_data 方法就能够将数据传递到上层,让用户在自定义的 Connection 中实现自己的处理逻辑,这也就是 Thin 需要覆写 #receive_data 方法来接受数据的原因了。

当 Thin 以及 Rack 应用已经接收到了来自用户的请求、完成处理并返回之后经过一系列复杂的调用栈就会执行 Connection#send_data 方法:

From: lib/em/connection.rb @ line 324:
Owner: EventMachine::Connection

def send_data data
  data = data.to_s
  size = data.bytesize if data.respond_to?(:bytesize)
  size ||= data.size
  EventMachine::send_data @signature, data, size
end

From: lib/em/pure_ruby.rb @ line 172:
Owner: #<Class:EventMachine>

def self.send_data target, data, datalength
  selectable = Reactor.instance.get_selectable( target ) or raise "unknown send_data target"
  selectable.send_data data
end

From: lib/em/pure_ruby.rb @ line 851:
Owner: EventMachine::StreamObject

def send_data data
  unless @close_scheduled or @close_requested or !data or data.length <= 0
    @outbound_q << data.to_s
  end
end

经过一系列同名方法的调用,在调用栈末尾的 StreamObject#send_data 中,将所有需要写入的数据全部加入 @outbound_q 中,这其实就是一个待写入数据的队列。

当 Socket 变得可写之后,就会由 #select 方法触发 #eventable_write 将 @outbound_q 队列中的数据通过 #write_nonblock 或者 syswrite 写入 Socket,也就是将请求返回给客户端。

From: lib/em/pure_ruby.rb @ line 823:
Owner: EventMachine::StreamObject

def eventable_write
  @last_activity = Reactor.instance.current_loop_time
  while data = @outbound_q.shift do
    begin
      data = data.to_s
      w = if io.respond_to?(:write_nonblock)
            io.write_nonblock data
          else
            io.syswrite data
          end

      if w < data.length
        @outbound_q.unshift data[w..-1]
        break
      end
    rescue Errno::EAGAIN, SSLConnectionWaitReadable, SSLConnectionWaitWritable
      @outbound_q.unshift data
      break
    rescue EOFError, Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::EPIPE, OpenSSL::SSL::SSLError
      @close_scheduled = true
      @outbound_q.clear
    end
  end
end

关闭 Socket

当数据写入时发生了 EOFError 或者其他错误时就会将 close_scheduled 标记为 true,在随后的事件循环中会关闭 Socket 并发送 ConnectionUnbound 事件:

From: lib/em/pure_ruby.rb @ line 540:
Owner: EventMachine::Reactor

def crank_selectables
  # ...

  @selectables.delete_if {|k,io|
    if io.close_scheduled?
      io.close
      begin
        EventMachine::event_callback io.uuid, ConnectionUnbound, nil
      rescue ConnectionNotBound; end
      true
    end
  }
end

.event_callback 在处理 ConnectionUnbound 事件时会在 @conns 中将结束的 Connection 剔除:

def self.event_callback conn_binding, opcode, data
  if opcode == ConnectionUnbound
    if c = @conns.delete( conn_binding )
      c.unbind
      io = c.instance_variable_get(:@io)
      begin
        io.close
      rescue Errno::EBADF, IOError
      end
    elsif c = @acceptors.delete( conn_binding )
    else
      raise ConnectionNotBound, "received ConnectionUnbound for an unknown signature: #{conn_binding}"
    end
  elsif opcode = 1
    #...
  end
end

在这之后会调用 Connection 的 #unbind 方法,再次执行 #close 确保 Socket 连接已经断掉了。

EventMachine 在处理用户的请求时,会通过一个事件循环和一个中心化的事件处理中心 .event_callback 来响应所有的事件,你可以看到在使用 EventMachine 时所有的响应都是异步的,尤其是对 Socket 的读写,所有外部的输入在 EventMachine 看来都是一个事件,它们会被 EventMachine 选择合适的处理器进行转发。

I/O 模型

Thin 本身其实没有实现任何的 I/O 模型,它通过对 EventMachine 进行封装,使用了其事件驱动的特点,为上层提供了处理并发 I/O 的 Reactor 模型,在不同的阶段有着不同的工作流程,在启动 Thin 的服务时,Thin 会直接通过 .start_server 创建一个 Socket 监听一个 组成的元组:

ruby-thin-server-6.png

当服务启动之后,就可以接受来自客户端的 HTTP 请求了,处理 HTTP 请求总共需要三个模块的合作,分别是 EventMachine、Thin 以及 Rack 应用:

ruby-thin-server-7.png

在上图中省略了 Rack 的处理部分,不过对于其他部分的展示还是比较详细的,EventMachine 负责对 TCP Socket 进行监听,在发生事件时通过 .event_callback 进行处理,将消息转发给位于 Thin 中的 Connection,该类以及模块负责处理 HTTP 协议相关的内容,将整个请求包装成一个 env 对象,调用 #call 方法。

在这时就开始了返回响应的逻辑了,#call 方法会返回一个三元组,经过 Thin 中的 #send_data 最终将数据写入 outbound_q 队列中等待处理:

ruby-thin-server-8.png

EventMachine 会通过一个事件循环,使用#select 监听当前 Socket 的可读写状态,并在合适的时候触发#eventable_write 从 outbound_q 队列中读取数据写入 Socket,在写入结束后 Socket 就会被关闭,整个请求的响应也就结束了。

ruby-thin-server-9.png

Thin 使用了 EventMachine 作为底层处理 TCP 协议的框架,提供了事件驱动的 I/O 模型,也就是我们理解的 Reactor 模型,对于每一个 HTTP 请求都会创建一个对应的 Connection 对象,所有的事件都由 EventMachine 来派发,最大程度做到了 I/O 的读写都是异步的,不会阻塞当前的线程,这也是 Thin 以及 Node.js 能够并发处理大量请求的原因。

Thin 作为一个 Ruby 社区中简单的 webserver,其实本身没有做太多的事情,只是使用了 EventMachine 提供的事件驱动的 I/O 模型,为上层提供了更加易用的 API,相比于其他同步处理请求的 webserver,Reactor 模式的优点就是 Thin 的优点,主程序只负责监听事件和分发事件,一旦涉及到 I/O 的工作都尽量使用回调的方式处理,当回调完成后再发送通知,这种方式能够减少进程的等待时间,时刻都在处理用户的请求和事件。

原文来自: 浅谈Thin的事件驱动模型


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK