30

类似Github的webhook实现

 4 years ago
source link: https://www.tuicool.com/articles/nQzE3yQ
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Webhook是一种非常强大的推送机制,如果熟悉WordPress的同学可以类比构建WP生态的各类钩子函数。Githubt通过webhook让开发人员可以监听仓库的变化触发持续集成工具的运作,比如Travis CI。

需求

大家都看过Github上的webhook,可以对某一个repository设置webhook监听仓库变化,比如push,page_build等event(X-GitHub-Event)。

Evm2Afa.jpg!web 每一次发送都会有个uuid作为标记,并写入到HTTP Header的X-GitHub-Delivery,并且对于发送失败的历史记录,可以点击Redeliver进行重发。

zyqmUjJ.jpg!web

如果把该功能作为一个单独的服务提供方,其根本诉求就是要准确记录到此服务与Internet每一次网络交互的过程,包括发送请求数据和响应结果数据。继续细化,

  1. 该服务提供方接收客户端的调用,发送请求到客户端所指定的url并获取响应。
  2. 记录每次客户端的原始请求内容(url, method, header, body)以及response(header, body, code, etc.)
  3. 需要考虑到客户端重试或重复调用的情况,需要记录每个请求的调用次数以及最后一次调用时间。(客户端调用时可能会传入一个clientId用于接收端去重,如未传服务提供方根据请求生成一个唯一uuid)
  4. 提供接口对指定的某个发送历史进行重发。

思考

拿到需求首先要思考一下这个服务会与哪些系统有交互?

  1. 请求要发送到指定的URL上,那么第一个交互的系统是某个公网服务。
  2. 发送的历史要能保留,说明数据是需要持久化的。第二个交互的系统是数据库。

好了,交互的系统确定后,接下来应该考虑顺序问题,是先发送请求到公网服务还是先操作数据库?我们逐个来分析一下

  • 方案A:先发送请求再将记录写入数据库。问题:如果请求发送了但是数据库写入失败,此时就会造成数据不一致,因为遗漏了发送历史。
  • 方案B:先写入数据库再发送请求。问题:与A类似的,如果数据库写入成功,请求发送失败,比如网络断开等原因。此时数据也会不一致。虽然有了发送历史,但实际发送是失败的。
  • 方案C:先写入数据库接着发送请求最后更新数据。这种方案相对来说比A和B要可靠。第一步写入请求的数据并将状态(status)置为 sending ,发送完成再更新status为 success 或者 failure
  • 方案D:先写入数据库,将status设置为 sending ,启动一个新的线程扫描该表,对status为 ready 的记录进行发送,发送完成再更新status为 success 或者 failure

前两个方案肯定是不可取的,我们来分析一下后两个方案的优劣。方案C的缺点在于第一步写入数据库完成后,发送请求时系统宕机,该记录会一直处于 sending 状态。好在整体方案会提供一个人工重试(点击Redeliver)的机制,可以事后弥补。优点在于串行化的思维编码比较容易。方案D的优点就是对于一直处于 sending 状态的历史记录,可以自动进行补发,因为有线程不断扫描。缺点在于这个扫描线程可能会加重数据库的负担。如果要想并行扫描那么又要解决任务分片和编排的问题(参考elestic-job),编码相对较难。

针对以上所述的利弊,最终我们选择方案C。其实把发送网络请求换成发送消息到MQ,那么方案D就很类似大家所熟知的“本地事务表”的解决方案,是将MQ的事务和本地数据库事务绑定的一种思路。

实现

数据结构,定义一个request和response

@Data
@NoArgsConstructor
public class WebHookRequest {

    @NotBlank
    private String url;

    private String method;

    private Map<String, String> headers;

    private String body;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public class WebHookResponse {

    private String id;
    private String data;
    private int code;

    public boolean isSuccessful() {
        return this.code >= 200 && this.code < 300;
    }

}

发送方法

public WebHookResponse send(WebHookRequest webHookRequest, String id, boolean retryOnServerError) {
    //如果traceId为空则根据请求参数生成一个md5的值作为traceId
    if (StringUtils.isBlank(id)) {
        //拼接请求参数
        String content = webHookRequest.getUrl()
                .concat(webHookRequest.getMethod())
                .concat(jsonMapper.toJson(webHookRequest.getHeaders()))
                .concat(webHookRequest.getBody());
        id = DigestUtils.md5Hex(content);
    }
        // 查找发送记录
    WebHookRecord webHookRecord = webHookRecordMapper.findById(id, TenantContext.getCurrentTenant());
    if (webHookRecord == null) {
        webHookRecord = createFromRequest(webHookRequest);
        webHookRecord.setId(id);
        try {
                        // 新建发送记录
            webHookRecordMapper.insert(webHookRecord);
        } catch (Exception ex) {
            LOGGER.warn("Duplicate key for [{}]", webHookRecord.getId());
            throw new BizException(B_01000, ex);
        }
    } else {
                // 如果历史已经存在,可能是客户端发送重发请求,那么先判断是否可以重发,sending状态在一定时间间隔内不能重发,避免频繁失败
        if (this.shouldResend(webHookRecord.getSendTime(), webHookRecord.getStatus())) {
            // 可以重发那么更新状态为sending
                        this.updateResendStatus(webHookRecord);
        } else {
            LOGGER.warn("Should not resend key for [{}]", webHookRecord.getId());
            throw new BizException(B_01000);
        }
    }
        // 通过网络发送请求,如果失败会直接更新status为failure并抛出异常,发送过程的异常和得到响应对方服务报异常还是不一样的
    WebHookResponse webHookResponse = this.doSend(id, webHookRequest);
        // 发送完成后更新status
    this.onResponse(id, webHookResponse);
    if (retryOnServerError && webHookResponse.getCode() >= 500) {
        throw new RetryException("server error!");
    }
    return webHookResponse;
}

/*
* 判断是否可以进行重发
*/
private boolean shouldResend(DateTime sendTime, String status) {
    if (!STATUS_SENDING.equals(status)) {
        return true;
    }
    // 发送状态超过60s可重发
    return sendTime.plusSeconds(60).isBefore(DateTime.now());
}

/*
* 调用网络接口进行发送
*/
private WebHookResponse doSend(String id, WebHookRequest webHookRequest) {
    //调用httpClient
    String responseBodyString = null;
    Response response;
    try {
        response = webHookIntegrationService.send(webHookRequest.getUrl(),
                webHookRequest.getMethod(),
                webHookRequest.getHeaders(),
                webHookRequest.getBody());
        if (response.body() != null) {
            responseBodyString = response.body().string();
        }
    } catch (Exception ex) {
        this.onSendFailed(id, ExceptionUtils.getStackTrace(ex));
        throw new RetryException("http send error!", ex);
    }
    return new WebHookResponse(id, responseBodyString, response.code());
}

/*
* 请求正常返回后的处理
* 这里有一个乐观锁的问题,如果同时有多个线程调用改办法修改同一个历史请求,只有一个线程会更新成功
*/
private void onResponse(String id, WebHookResponse webHookResponse) {
    WebHookRecord webHookRecord = webHookRecordMapper.findById(id, TenantContext.getCurrentTenant());
    webHookRecord.setResponse(webHookResponse.getData());
    webHookRecord.setResponseCode(webHookResponse.getCode());
    //更新状态
    if (webHookResponse.isSuccessful()) {
        webHookRecord.setStatus(STATUS_SUCCESS);
    } else {
        webHookRecord.setStatus(STATUS_ERROR);
    }
    int count = webHookRecordMapper.updateResponse(webHookRecord);
    if (count == 0) {
        LOGGER.warn("Attempt to update WebHook id={} with wrong version ({})", id, webHookRecord.getVersion());
    }
}

/*
* 再次发送时更新状态 (真正发送网络请求前)
*/
private void updateResendStatus(WebHookRecord webHookRecord) {
    webHookRecord.setStatus(STATUS_SENDING);
    webHookRecord.setSendTime(DateTime.now());
    int count = webHookRecordMapper.resend(webHookRecord);
    if (count == 0) {
        throw new OptimisticLockingFailureException("Attempt to update WebHook id=" + webHookRecord.getId() + " with wrong version (" + webHookRecord.getVersion() + ")");
    }
}

为了避免对于同一个请求有多个线程同时发起重试的问题,我们在 updateResendStatus 方法上使用了乐观锁,如果其中一个线程更新状态成功,那么其他线程会因乐观锁问题直接失败,不会走到真正发送网络的请求的那步。也就是说在发送网络请求前过滤绝大部分并发问题。

最后,可以使用一个单元测试,模拟并发请求进行验证。

@Test
public void testMultiThreads() throws InterruptedException {
    //调用send接口
    WebHookRequest webHookRequest = new WebHookRequest();
    webHookRequest.setUrl("http://www.qq.com");
    webHookRequest.setMethod("POST");
    Map<String, String> headers = new HashMap<>(1);
    webHookRequest.setHeaders(headers);

    int nLoop = 100;
    String clientId = UUID.randomUUID().toString();
    CountDownLatch countDownLatch = new CountDownLatch(nLoop);
    Runnable task = () -> {
        try {
            givenToken().when().body(webHookRequest).post("/webhooks/send?clientId=" + clientId)
                    .then()
                    .statusCode(HttpStatus.OK.value())
                    .extract()
                    .response();
        } finally {
            countDownLatch.countDown();
        }
    };
    ExecutorService executorService = new ThreadPoolBuilder.FixedThreadPoolBuilder().setThreadNamePrefix("thread-webhook").setPoolSize(100).build();
    for (int i = 0; i < nLoop; i++) {
        executorService.execute(task);
    }
    countDownLatch.await();

    int times = webHookMapper.findById(clientId, 1L).getTimes();
        // 验证数据库里记录的发送次数是否 等于 真正调用发送网络请求接口的次数
    Mockito.verify(webHookIntegrationService, Mockito.times(times)).send(Mockito.any(),
            Mockito.any(), Mockito.any(), Mockito.any());
}

总结

如果想进一步提升性能,可以使用支持异步的httpclient工具包, onResponse 在callback中进行处理。

整体来说,这是个很简单的小需求,但要考虑周全其实还是要费一番功夫的。究其本质就是异构系统间的数据一致性问题。当我们把发送网络请求换成写数据到redis,到MQ,到另一个微服务时,就会发现它们存在的共通性。一次请求涉及多个系统,并且无法包裹进同一个事务,就会产生这样的问题。至于解决方案是二阶段提交,事后补偿,还是自动对账,就要根据自己的业务特点来选择了。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK