10

【代码鉴赏】简单优雅的JavaScript代码片段(二):流控和重试

 2 years ago
source link: https://segmentfault.com/a/1190000040920165
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本系列上一篇文章:【代码鉴赏】简单优雅的JavaScript代码片段(一):异步控制

流控(又称限流,控制调用频率)

后端为了保证系统稳定运行,往往会对调用频率进行限制(比如每人每秒不得超过10次)。为了避免造成资源浪费或者遭受系统惩罚,前端也需要主动限制自己调用API的频率。

前端需要大批量拉取列表时,或者需要对每一个列表项调用API查询详情时,尤其需要进行限流。

这里提供一个流控工具函数wrapFlowControl,它的好处是:

  • 使用简单、对调用者透明:只需要包装一下你原本的异步函数,即可得到拥有流控限制的函数,它与原本的异步函数使用方式相同。const apiWithFlowControl = wrapFlowControl(callAPI, 2);
  • 不会丢弃任何一次调用(不像防抖节流)。每一次调用都会被执行、得到相应的结果。只不过可能会为了控制频率而被延迟执行。

使用示例:

// 创建了一个调度队列
const apiWithFlowControl = wrapFlowControl(callAPI, 2);

// ......

<button
  onClick={() => {
    const count = ++countRef.current;
    // 请求调度队列安排一次函数调用
    apiWithFlowControl(count).then((result) => {
      // do something with api result
    });
  }}
>
  Call apiWithFlowControl
</button>

codesandbox在线示例

这个方案的本质是,先通过wrapFlowControl创建了一个调度队列,然后在每次调用apiWithFlowControl的时候,请求调度队列安排一次函数调用。

wrapFlowControl的代码实现:

const ONE_SECOND_MS = 1000;

/**
 * 控制函数调用频率。在任何一个1秒的区间,调用fn的次数不会超过maxExecPerSec次。
 * 如果函数触发频率超过限制,则会延缓一部分调用,使得实际调用频率满足上面的要求。
 */
export function wrapFlowControl<Args extends any[], Ret>(
  fn: (...args: Args) => Promise<Ret>,
  maxExecPerSec: number
) {
  if (maxExecPerSec < 1) throw new Error(`invalid maxExecPerSec`);
  // 调度队列,记录将要执行的任务
  const queue: QueueItem[] = [];
  // 最近一秒钟的执行记录,用于判断执行频率是否超出限制
  const executed: ExecutedItem[] = [];

  return function wrapped(...args: Args): Promise<Ret> {
    return enqueue(args);
  };

  function enqueue(args: Args): Promise<Ret> {
    return new Promise((resolve, reject) => {
      queue.push({ args, resolve, reject });
      scheduleCheckQueue();
    });
  }

  function scheduleCheckQueue() {
    const nextTask = queue[0];
    // 仅在queue为空时,才会停止scheduleCheckQueue递归调用
    if (!nextTask) return;
    cleanExecuted();
    if (executed.length < maxExecPerSec) {
      // 最近一秒钟执行的数量少于阈值,才可以执行下一个task
      queue.shift();
      execute(nextTask);
      scheduleCheckQueue();
    } else {
      // 过一会再调度
      const earliestExecuted = executed[0];
      const now = new Date().valueOf();
      const waitTime = earliestExecuted.timestamp + ONE_SECOND_MS - now;
      setTimeout(() => {
        // 此时earliestExecuted已经可以被清除,给下一个task的执行提供配额
        scheduleCheckQueue();
      }, waitTime);
    }
  }

  function cleanExecuted() {
    const now = new Date().valueOf();
    const oneSecondAgo = now - ONE_SECOND_MS;
    while (executed[0]?.timestamp <= oneSecondAgo) {
      executed.shift();
    }
  }

  function execute({ args, resolve, reject }: QueueItem) {
    const timestamp = new Date().valueOf();
    fn(...args).then(resolve, reject);
    executed.push({ timestamp });
  }

  type QueueItem = {
    args: Args;
    resolve: (ret: Ret) => void;
    reject: (error: any) => void;
  };

  type ExecutedItem = {
    timestamp: number;
  };
}

延迟确定函数逻辑

从上面的示例可以看出,在使用wrapFlowControl的时候,你需要预先定义好异步函数callAPI的逻辑,才能得到流控函数。

但是在一些特殊场景中,我们要在发起调用的时候,才确定异步函数应该执行什么逻辑。即将“定义时确定”推迟到“调用时确定”。因此我们实现了另一个工具函数createFlowControlScheduler

在上面的使用示例中,DemoWrapFlowControl就是一个例子:我们在用户点击按钮的时候,才决定要调用API1还是API2。

// 创建一个调度队列
const scheduleCallWithFlowControl = createFlowControlScheduler(2);

// ......

<div style={{ marginTop: 24 }}>
  <button
    onClick={() => {
      const count = ++countRef.current;
      // 在调用时才决定要执行的异步操作
      // 将异步操作加入调度队列
      scheduleCallWithFlowControl(async () => {
        // 流控会保障这个异步函数的执行频率
        if (count % 2 === 1) {
          return callAPI1(count);
        } else {
          return callAPI2(count);
        }
      }).then((result) => {
        // do something with api result
      });
    }}
  >
    Call scheduleCallWithFlowControl
  </button>
</div>

codesandbox在线示例

这个方案的本质是,先通过createFlowControlScheduler创建了一个调度队列,然后每当scheduleCallWithFlowControl接受到一个异步任务,就会将它加入调度队列。调度队列会确保所有异步任务都被调用(按照加入队列的顺序),并且任务执行频率不超过指定的值。

createFlowControlScheduler的实现其实非常简单,基于前面的wrapFlowControl实现:

/**
 * 类似于wrapFlowControl,只不过将task的定义延迟到调用wrapper时才提供,
 * 而不是在创建flowControl wrapper时就提供
 */
export function createFlowControlScheduler(maxExecPerSec: number) {
  return wrapFlowControl(async <T>(task: () => Promise<T>) => {
    return task();
  }, maxExecPerSec);
}

如何改造我们的工具函数,让它能够支持“每分钟不得超过n次”的频率限制?
如何改造我们的工具函数,让它能够同时支持“每秒钟不得超过n次”且“每分钟不得超过m次”的频率限制?如何实现更灵活的调度队列?

举个例子,频率限制为“每秒钟不得超过10次”且“每分钟不得超过30次”。它的意义在于,允许短时间内的突发高频调用(通过放松秒级限制),同时又阻止高频调用持续太长之间(通过分钟级限制)。

前面我们已经得到了一个在前端限制调用频率的方案。但是,即使我们已经在前端限制了调用频率,依然可能遇到错误:

  1. 前端的流控无法完全满足后端的流控限制。后端可能会对所有用户的调用之和做一个整体限制。比如所有用户的调用频率不能超过每秒一万次,前端流控无法对齐这种限制。
  2. 非流控错误。比如后端服务或网络不稳定,造成的短暂不可用。

因此,面对这些前端不可避免的错误,需要通过重试来得到结果。这里提供一个重试工具函数wrapRetry,它的好处是:

  • 使用简单、对调用者透明:与前面的流控工具函数一样,只需要包装一下你原本的异步函数,即可得到自动重试的函数,它与原本的异步函数使用方式相同。
  • 支持自定义要重试的错误类型、重试次数、重试等待时间。

使用方式:

const apiWithRetry = wrapRetry(
  callAPI,
  (error, retryCount) => error.type === "throttle" && retryCount <= 5
);

它的使用方式与wrapFlowControl类似。

wrapRetry代码实现:

/**
 * 捕获到特定的失败以后会重试。适合无副作用的操作。
 * 比如数据请求可能被流控拦截,就可以用它来做自动重试。
 */
export function wrapRetry<Args extends any[], Ret>(
  fn: (...args: Args) => Promise<Ret>,
  shouldRetry: (error: any, retryCount: number) => boolean,
  startRetryWait: number = 1000
) {
  return async function wrapped(...args: Args): Promise<Ret> {
    return callFn(args, startRetryWait, 0);
  };

  async function callFn(
    args: Args,
    wait: number,
    retryCount: number
  ): Promise<Ret> {
    try {
      return await fn(...args);
    } catch (error) {
      if (shouldRetry(error, retryCount)) {
        if (wait > 0) await timeout(wait);
        // nextWait是wait的 1 ~ 2 倍
        // 如果startRetryWait是0,则wait总是0
        const nextWait = wait * (Math.random() + 1);
        return callFn(args, nextWait, retryCount + 1);
      } else {
        throw error;
      }
    }
  }
}

function timeout(wait: number) {
  return new Promise((res) => {
    setTimeout(() => {
      res(null);
    }, wait);
  });
}

其中,我们增加了一个优化点:让重试等待时间逐步增加。比如,第2次重试的等待时间是第一次重试等待时间的1 ~ 2 倍。这是为了尽可能减少调用次数,避免给正处于不稳定的后端带来更多压力。

没有选择2倍增加,是为了避免重试等待时间太长,降低用户体验。

值得一提的是,自动重试可以与前面的限流工具组合起来使用(得益于它们都对调用者透明,不改变函数使用方式):

const apiWithFlowControl = wrapFlowControl(callAPI, 2);
const apiWithRetry = wrapRetry(
  apiWithFlowControl,
  (error, retryCount) => error.type === "throttle" && retryCount <= 5
);

注意,限流包装在内部,重试包装在外部,这样才能保证重试发起的请求也能受到限流的控制。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK