3

Redis(5.0.3)里一个简单请求如何被处理

 2 years ago
source link: http://cbsheng.github.io/posts/redis%E9%87%8C%E4%B8%80%E4%B8%AA%E7%AE%80%E5%8D%95%E8%AF%B7%E6%B1%82%E5%A6%82%E4%BD%95%E8%A2%AB%E5%A4%84%E7%90%86/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Redis(5.0.3)里一个简单请求如何被处理


set text “hello world” 从进入服务器到输出结果,整个流程是怎样的?带着这个问题来看看源码。

在命令进入服务之前,服务器需要先初始化好自己

与这个场景相关的,两件事比较重要:

  1. 先注册好所有支持的Redis命令
  2. 初始化并启动事件循环器

所有的Redis命令先是被组织成一个table,里面包含每个命令的名称、对应处理函数、flag、调用次数等信息。

// src/server.c
// 太多了只给出小部分
struct redisCommand redisCommandTable[] = {
  {"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},
  {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
  {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
  // ...
}

这个redisCommandTable被映射到哈希结构里,就是Redis内部定义的Dict结构。O(1)就能取到对应命令的处理函数。

// src/server.c
void initServerConfig(void) {
  // ...
  // 设置命令映射表
  populateCommandTable();
  // ...
}

void populateCommandTable(void) {
  // ...
  int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);

  for (j = 0; j < numcommands; j++) {
    struct redisCommand *c = redisCommandTable+j;
    // ...

  	// 把每个命令放到哈希结构中
  	retval1 = dictAdd(server.commands, sdsnew(c->name), c);
  	retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);

  }
}

然后启动事件循环器,监听默认的6379端口,并设置socket为no_blocking。

将监听6379端口的socket包装为一个aeFileEvent对象,通过aeCreateFileEvent()注册到事件循环器里。注册时,会还会注册一个回调函数acceptTcpHandler()。即有新连接要到来时,就调用回调函数进行accept。

accept到的就是client连接。accept返回一个文件描述符,也将它注册进事件循环器里。这样之后client发起一个set text "hello world"请求到达server时,文件描述符变得可读,事件循环器会捕获到此事件并调用对应的回调函数readQueryFromClient()

每个连接进来,都会创建一个对应的client对象,里面存储client发起的命令,输入/出缓冲区等信息。

所有client对象也会被挂到server.clients链表上。

// src/server.c
void initServer(void) {
  // ...
  server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

  if (server.port != 0 &&
      listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
    exit(1)

  // ...
  // 将监听client请求的IO文件事件加到事件循环器中
  for (j = 0; j < server.ipfd_count; j++) {
      // acceptTcpHandler 就是回调函数
      if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR)
      {
        serverPanic(
          "Unrecoverable error creating server.ipfd file event.");
      }
    }
  // ...
}

// src/networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    // ...
    while(max--) {
      // 通过accept系统调用获取client连接的文件描述符,及client ip/port
      cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
      // ...
      serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
      acceptCommonHandler(cfd,0,cip);
    }
}

static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c;
    // 为新连接绑定一个client对象,并且将连接对应的文件描述符放进事件循环器里
    if ((c = createClient(fd)) == NULL) {
        // ...
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
  // ...
}

client *createClient(int fd) {
    // 为这个连接创建一个client
    client *c = zmalloc(sizeof(client));
    if (fd != -1) {
        // 将文件描述符设置为noblocking
        anetNonBlock(NULL,fd);
        // 设置TCP_NODELAY,关闭Nagle算法
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive) // 开启tcp keepalive
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // ok,可以将文件描述符加到事件循环器里了
        if (aeCreateFileEvent(server.el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR)
        {
          // ...
        }
    }

    // 默认客户端操作redis第一个db
    selectDb(c,0);
    // ... 这里是一系列client对象的初始化操作
    // 新client追加到server.clients链表后
    if (fd != -1) linkClient(c);
    initClientMultiState(c);
    return c;
}

回调函数就是在事件循环器中被触发的。拿到有就绪事件的文件描述符后,判断是读还是写,再调用对应的回调函数(fe->rfileProc()fe->wfileProc())。回调函数的类型为void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask)

// src/server.c
// aeMain() -> aeProcessEvents()
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
  // ...
  numevents = aeApiPoll(eventLoop, tvp)

  // ...
  for (j = 0; j < numevents; j++) {
    aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    int mask = eventLoop->fired[j].mask;
    int fd = eventLoop->fired[j].fd
    // ...
    if (!invert && fe->mask & mask & AE_READABLE) {
      // 读事件处理函数
      fe->rfileProc(eventLoop,fd,fe->clientData,mask);
      fired++;
    }

    if (fe->mask & mask & AE_WRITABLE) {
      if (!fired || fe->wfileProc != fe->rfileProc) {
        // 写事件处理函数
        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
        fired++;
      }
    }
  }
  // ...
}

现在可以发送命令了

127.0.0.1:6379> set text “hello world”

敲下回车键,命令通过TCP协议到了server后,accept到新的套接字,并且是可读状态。

这时注册的回调函数readQueryFromClient()就被触发调用。它是所有命令的入口。

上面的源码也能看到,注册readQueryFromClient()之前,是为新连接创建一个client对象,命令的内容,client的属性,输入/输出缓冲区等都是与这个client绑定的。

readQueryFromClient()通过系统调用read()从套接字里读取命令,放在client.querybuf。读取的字节数是有限制的,读取到的内容也有长度长限,超过上限就会拒连释放client对象。

对于流入流出redis的字节数,自然也是在read和write这两个环节中被记录。

// src/server.h
#define PROTO_IOBUF_LEN         (1024*16)

// src/networking.c
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    int nread, readlen;

    readlen = PROTO_IOBUF_LEN;

    // ...

    // 读取client发送的命令
    nread = read(fd, c->querybuf+qblen, readlen);

    // ...

    // 设置buf的长度字段
    sdsIncrLen(c->querybuf,nread);
    // ...
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    // 记录入流量的字节数
    server.stat_net_input_bytes += nread;
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        // 当前client的query缓冲区超出最大限制,拒绝命令的进一步处理,并打log
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }

    // 在processInputBufferAndReplicate做分发处理命令
    processInputBufferAndReplicate(c);
}

processInputBufferAndReplicate()会区分client是不是master节点来响应命令,两种处理方式当然有些差别。不过这里我们先不关心replicate。

读取的命令放在client.querybuf后,是需要按照redis的通信协议进行解析的。解析完做一些常规的检查,例如命令是否存在,命名参数是否合法等。

检查是在processCommand()里进行的。通过后,就可以调用注册好的命令回调函数来处理了。逻辑入口就是processInputBuffer()

// src/networking.c
void processInputBuffer(client *c) {
    server.current_client = c;

    while(c->qb_pos < sdslen(c->querybuf)) {
        // 这里有一些检查,主要用于判断命令的处理是否有必要进行
        // 用户在此之前执行了client pause命令
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
        if (c->flags & CLIENT_BLOCKED) break;
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

        // https://redis.io/topics/protocol redis使用的通信文本协议
        if (!c->reqtype) {
            if (c->querybuf[c->qb_pos] == '*') {
                // 数组
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }

        // 解释client命令文本
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }

        if (c->argc == 0) {
            resetClient(c);
        } else {
            // 正式响应client命令
            if (processCommand(c) == C_OK) {
                // ...

                // 如果client是正在执行着阻塞式命令,就先不resetclient
                // 否则,执行的其他非阻塞命令就resetclient,这样可以接着处理此client接下来发送的命令
                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                    resetClient(c);
            }
            // ...
        }
    }
    // ...
}

processCommand()方法可长了,会有一系列的不同模式下的处理方式与检查方式,例如quit命令的特殊处理、权限的鉴权、redis cluster模式下对命令的响应逻辑,设置了min-slaves-to-write后的检查拦截逻辑等待。最后前面拦截检查都通过后,最后就调用call()来执行命令。

call()是redis里执行命令的核心,所以前后肯定又是一系列的检查。关键的就是c->cmd->proc(c);,它就是调用命令注册的回调函数。

// src/server.c
int processCommand(client *c) {
  // ...

  /* Exec the command */
  if (c->flags & CLIENT_MULTI &&
      c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
      c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
  {
    queueMultiCommand(c);
    addReply(c,shared.queued);
  } else {
    call(c,CMD_CALL_FULL);
    c->woff = server.master_repl_offset;
    if (listLength(server.ready_keys))
      handleClientsBlockedOnKeys();
  }
  return C_OK;
}

void call(client *c, int flags) {
    // ...
    start = ustime();
    c->cmd->proc(c);
    // 命令执行耗时会被记录,这样才会有慢查询日志
    duration = ustime()-start
    // ...
}

而本文的set命令对应回调函数就是setCommand(),它位于src/t_string.c。执行成功后一般给client返回OK,这个OK字符串就是通过addReply()方法写到client的输出缓冲区的。

setCommand()里的一顿操作我们先不关注,重点来看看addReply()

// src/networking.c
void addReply(client *c, robj *obj) {
    // 检查该client能不能回复, 可以回复的话,加到server.client_pending_write队列上
    // 例如对client为master节点默认就是不回复命令执行结果
    if (prepareClientToWrite(c) != C_OK) return;

    // 判断返回内容的类型,字符串还是数字
    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        /* For integer encoded strings we just convert it into a string
         * using our optimized function, and attach the resulting string
         * to the output buffer. */
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyStringToList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

看完addReply()的整个处理过程,也看不到怎样给client发送回复,都是把回复内容写到输出缓冲区里。怎么返回client结果呢?

返回命令执行结果

其实在redis里,事件的处理顺序是:

  1. client输出缓存有内容则返回执行结果
  2. 读取client发送的命令,执行命令,缓存执行结果
  3. 执行定时任务

就是这样周而复始。

这个循环在哪儿呢?就是aeMain()啦。而第一步给client返回执行结果的逻辑被放置在eventLoop->beforesleep()这个钩子里。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 处理IO事件与时间事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

// 这是注册到钩子的回调函数
void beforeSleep(struct aeEventLoop *eventLoop) {
    // ...

    // 如果有client的输出缓冲区不为空,则监听client对应的文件描述符的可写状态
    // 等于将文件描述符注册到事件循环器中
    handleClientsWithPendingWrites();
  	// ...
}

int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);

    // 所有输出缓存区有内容的client都在clients_pending_write队列上
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        // ...
        // 先写一次
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

        // 之前那一次没写完,那就监听可写事件,注册回调函数,直至写完为止
        // 如果输出内容多,tcp协议栈的写缓存有限,不可能一次性全写
        if (clientHasPendingReplies(c)) {
            int ae_flags = AE_WRITABLE;
            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS)
            {
                ae_flags |= AE_BARRIER;
            }
            // 回调函数sendReplyToClient其实本质也是调用writeToClient。
            // 只不过包装一层,使其可以作为ae的回调函数
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR)
            {
                    freeClientAsync(c);
            }
        }
    }
    return processed;
}

以上就是一个简单命令的处理流程。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK