5

skynet源码分析之网络层—Lua层

 3 years ago
source link: https://segmentfault.com/a/1190000038662972
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

skynet源码分析之网络层—Lua层

本篇主要介绍在Lua服务里调用skynet网络层底层接口的流程,Lua层的api主要在lualib/skynet/socket.lua,可参考官方wiki https://github.com/cloudwu/sk...

通过一个简单的例子说明Lua服务是如何最终调用到网络层底层接口的:

`local socket = require “socket”
   local skynet = require "skynet"

   local function loop(fd)
       socket.start(fd)
       while true do
           local data = socket.readline('n')
           print(data, #data)
       end
   end

  skynet.start(function()
      local listen_fd = socket.listen(ip, hort)
      socket.start(listen_fd, function(fd, addr)
          print("connect fd[%d], addr[%s]", fd, addr)
          skynet.fork(loop, fd)
      end)
  end)` 

api调用流程概述

在服务启动时,调用socket.listen监听。调用流程是:driver.listen(第7行)——>skynet_socket_listen(第17行)——>socket_server_listen(第29行)——>send_request(第47行),最后向发送管道写数据。Lua接口执行流程是:socket.lua -> lua-socket.c ->skynet_socket.c -> socket_server.c

注:第34行,do_listen依次调用了unix网络系统接口socket,bind,listen。

`// lualib/skynet/socket.lua
function socket.listen(host, port, backlog)

if port == nil then
    host, port = string.match(host, "([^:]+):(.+)$")
    port = tonumber(port)
end
return driver.listen(host, port, backlog)

// lualib-src/lua-socket.c
static int
llisten(lua_State *L) {

const char * host = luaL_checkstring(L,1);
int port = luaL_checkinteger(L,2);
int backlog = luaL_optinteger(L,3,BACKLOG);
struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
int id = skynet_socket_listen(ctx, host,port,backlog);
if (id < 0) {
    return luaL_error(L, "Listen error");
}

lua_pushinteger(L,id);
return 1;

// skynet-src/skynet_socket.c
skynet_socket_listen(struct skynet_context ctx, const char host, int port, int backlog) {

uint32_t source = skynet_context_handle(ctx);
return socket_server_listen(SOCKET_SERVER, source, host, port, backlog);

// skynet-src/socket_server.c
socket_server_listen(struct socket_server ss, uintptr_t opaque, const char addr, int port, int backlog) {

int fd = do_listen(addr, port, backlog);
if (fd < 0) {
    return -1;
}
struct request_package request;
int id = reserve_id(ss);
if (id < 0) {
    close(fd);
    return id;
}
request.u.listen.opaque = opaque;
request.u.listen.id = id;
request.u.listen.fd = fd;
send_request(ss, &request, 'L', sizeof(request.u.listen));
return id;

socket连接过程

skynet里的socket结构有几种状态:

#define SOCKET_TYPE_INVALID 0 //可使用
#define SOCKET_TYPE_RESERVE 1 //已占用
#define SOCKET_TYPE_PLISTEN 2 //等待监听(监听套接字拥有)
#define SOCKET_TYPE_LISTEN 3 //监听,可接受客户端的连接(监听套接字才拥有)
#define SOCKET_TYPE_CONNECTING 4 //正在连接(connect失败时状态,tcp会尝试重新connect)
#define SOCKET_TYPE_CONNECTED 5 //已连接,可以收发数据
#define SOCKET_TYPE_HALFCLOSE 6
#define SOCKET_TYPE_PACCEPT 7 //等待连接(连接套接字才拥有)
#define SOCKET_TYPE_BIND 8

当工作线程执行socket.listen后,socket线程从接收管道读取数据,执行ctrl_cmd,调用listen_socket(第6行),此时该socket状态是SOCKET_TYPE_PLISTEN(第18行)

`// skynet-src/socket_server.c
static int
ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
    ...
    case 'L':
        return listen_socket(ss,(struct request_listen *)buffer, result);
    ...
}

static int
listen_socket(struct socket_server *ss, struct request_listen * request, struct socket_message *result) {
    int id = request->id;
    int listen_fd = request->fd;
    struct socket *s = new_fd(ss, id, listen_fd, PROTOCOL_TCP, request->opaque, false);
    if (s == NULL) {
        goto _failed;
    }
    s->type = SOCKET_TYPE_PLISTEN;
    return -1;
    ...
}` 

接着,Lua服务调用socket.start,最终socket线程执行start_socket,此时socket状态是SOCKET_TYPE_LISTEN,等待客户端的连接请求。

`// skynet-src/socket_server.c
  static int
  start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *result) {
      ...
      if (s->type == SOCKET_TYPE_PACCEPT || s->type == SOCKET_TYPE_PLISTEN) {
          if (sp_add(ss->event_fd, s->fd, s)) {
              force_close(ss, s, &l, result);
              result->data = strerror(errno);
              return SOCKET_ERR;
          }
         s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN;
         s->opaque = request->opaque;
         result->data = "start";
         return SOCKET_OPEN;
     }
     ...
 }` 

当客户端发起连接请求后,epoll事件返回,调用report_accept(第5行)

第14行,调用unix网络系统接口accept,接受客户端的请求。由于客户端已发起连接,所以不会阻塞。

第16行,从socket池中获取可用的socket id

17-22行,初始化该socket,此时socket状态是SOCKET_TYPE_PACCEPT

`int
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
    ...
    case SOCKET_TYPE_LISTEN: {
        int ok = report_accept(ss, s, result);
    ...
}

// return 0 when failed, or -1 when file limit
static int
report_accept(struct socket_server *ss, struct socket *s, struct socket_message *result) {
    union sockaddr_all u;
    socklen_t len = sizeof(u);
    int client_fd = accept(s->fd, &u.s, &len);
    ...
    int id = reserve_id(ss);
    struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false);
    ns->type = SOCKET_TYPE_PACCEPT;
    result->opaque = s->opaque;
    result->id = s->id;
    result->ud = id;
    result->data = NULL;

    ...
    return 1;
}` 

接着,Lua服务再次调用socket.start(id),此时id是连接的socket,而不是监听的socket。此时,socket状态是SOCKET_TYPE_CONNECTED,连接已经建立,可以收发数据。这就是整个socket连接过程。

至于怎么通知到 Lua服务稍后分析。

`// skynet-src/socket_server.c
 static int
 start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *result) {
     ...
     s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN;
     ...
 }` 

关闭socket,socket.close

发送数据有两个api,正常发送socket.write, 低优先级发送socket.lwrite。

网络层如何通知给Lua服务

socket线程在运行过程(socket_server_poll)中,当收到网络数据会调用forward_message_tcp

第19行,调用unix系统接口读取socket上的数据

21-24行,采用args-value形式构造result,opaque是Lua服务的地址,id是该socket在池中的索引,ud是实际读取到的字节数,data是数据

第25行,返回SOCKET_DATA,表示接收到数据。

`// skynet-src/socket_server.c
int
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
    ...
    default:
        if (e->read) {
            int type;
            if (s->protocol == PROTOCOL_TCP) {
                type = forward_message_tcp(ss, s, &l, result);
        ...
    return type
}

static int
forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * resu
lt) {
    int sz = s->p.size;
    char * buffer = MALLOC(sz);
    int n = (int)read(s->fd, buffer, sz);
    ...
    result->opaque = s->opaque;
    result->id = s->id;
    result->ud = n;
    result->data = buffer;
    return SOCKET_DATA;
}` 

由于socket_server_poll返回的是SOCKET_DATA,调用forward_message(第11行),

23-26行,构造即将要发送的消息数据,用到了上面返回的result

28-32行,构造skynet消息结构,因为是在网络层发送的,不是具体的某个服务,所以source,session字段都设置成0即可

第34行,把消息发送给与socket对应的服务地址。

至此,网络消息通知给具体的Lua服务。

`// skynet-src/skynet_socket.c
int
skynet_socket_poll() {
    struct socket_server *ss = SOCKET_SERVER;
    assert(ss);
    struct socket_message result;
    int more = 1;
    int type = socket_server_poll(ss, &result, &more);
    switch (type) {
    case SOCKET_DATA:
        forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
        break;
        ...
    return 1;
}

// mainloop thread
static void
forward_message(int type, bool padding, struct socket_message * result) {
    struct skynet_socket_message *sm;
    size_t sz = sizeof(*sm);
    ...
    sm = (struct skynet_socket_message *)skynet_malloc(sz);
    sm->type = type;
    sm->id = result->id;
    sm->ud = result->ud;
    ...
    struct skynet_message message;
    message.source = 0;
    message.session = 0;
    message.data = sm;
    message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);

    if (skynet_context_push((uint32_t)result->opaque, &message)) {
        // todo: report somewhere to close socket
        // don't call skynet_socket_close here (It will block mainloop)
        skynet_free(sm->buffer);
        skynet_free(sm);
    }
}` 

Lua服务处理流程

当网络数据到达Lua服务时,lualib/skynet/socket.lua中提供了相应的处理方案。调用消息分发函数socket_message,网络数据类型包含正常数据传输(DATA),连接(CONNECT),关闭(CLOSE),错误(ERROR)等。

第15行,把客户端发过来的数据push到该socket的缓冲池中。

`-- lualib/skynet/socket.lua
 skynet.register_protocol {
     name = "socket",
     id = skynet.PTYPE_SOCKET,       -- PTYPE_SOCKET = 6
     unpack = driver.unpack,
     dispatch = function (_, _, t, ...)
         socket_message[t](...)
     end
 }

 -- SKYNET_SOCKET_TYPE_DATA = 1
 socket_message[1] = function(id, size, data)
     local s = socket_pool[id]
     ...
     local sz = driver.push(s.buffer, buffer_pool, data, size)
     ...
 }` 

socket.read(id, sz),从一个socket上读sz指定的字节数,如果缓冲池里有足够多的数据,从缓冲池里pop出直接返回(第5行),否则,暂停当前协程(第15行),当数据够或者连接断开时重启协程。

`-- lualib/skynet/socket.lua
 function socket.read(id, sz)
     local s = socket_pool[id]
     assert(s)
     ...
     local ret = driver.pop(s.buffer, buffer_pool, sz)
     if ret then
         return ret
     end
     if not s.connected then
         return false, driver.readall(s.buffer, buffer_pool)
     end

     assert(not s.read_required)
     s.read_required = sz
     suspend(s)
     ret = driver.pop(s.buffer, buffer_pool, sz)
     if ret then
         return ret
     else
         return false, driver.readall(s.buffer, buffer_pool)
     end
 end` 



socket.readline(id, sep),从一个socket上读以sep分割的数据,默认是"n",即读一行数据。注:该api可以指定分隔符,不单单是一行数据。

socket.abandon(id),清除socket id在本服务内的数据结构,但不并关闭这个socket,用于把id转给其他服务控制。通常,会设计一个master服务接收外部连接,等连接上后再将socket分配给一个slave服务控制,减少master服务的压力。

socket库的使用流程一般是:

-- master服务
local listen_fd = socket.listen(ip, port)  //监听一个地址
socket.start(listen_fd, function(fd, addr)
     slave.post.start(fd)  //客户端连接上,转交给slave
     socket.abandon(fd)
end)

-- slave服务
function accept.start(fd)
      socket.start(fd) //接管socket
       ...
end

最后,小编推荐自己的Linux、C/C++技术交流群:【960994558】整理了一些个人觉得比较好的学习书籍、视频资料共享在里面(包括C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等等.),有需要的可以自行添加哦!~


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK