22

ES数据可靠性分析

 3 years ago
source link: https://niyanchun.com/es-data-reliability.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

ES作为全文检索兼存储系统,数据可靠性至关重要,本文讨论ES是如何实现数据可靠性的。ES底层基于Lucene,所以有必要先搞清楚一些相关的概念。

refresh && flush && commit

Lucene中,有flush和commit的概念。所谓flush,就是定期将内存Buffer里面的数据刷新到Directory这样一个抽象的文件存储层,其实就是生成segment。需要注意的是, 因为操作系统file cache的原因,这里的刷新未必会真的落盘,一般只是从内存buffer刷到了file cache而已,实质还是在内存中,所以是一个相对比较高效和轻量级的操作。 flush方法的java doc是这样描述的:

Moves all in-memory segments to the Directory, but does not commit (fsync) them (call commit() for that).”

关于segment再补充一点:形成segment以后,数据就可以被搜索了,但因为flush一般比较频繁(ES里面执行频率默认是1秒),所以会产生很多小的segment文件,一方面太多的文件会占用太多的文件描述符;另一方面,搜索时文件太多也会影响搜索效率,所以Lucene有专门的Merge线程定期将小的segment文件merge为大文件。

Lucene的commit上面的Java doc已经提到了,它会调用fsync,commit方法的java doc如下:

Commits all pending changes (added and deleted documents, segment merges, added indexes, etc.) to the index, and syncs all referenced index files, such that a reader will see the changes and the index updates will survive an OS or machine crash or power loss.

因为会调用fsync,所以commit之后,文件肯定会被持久化到磁盘上,所以这是一个重操作,一方面是磁盘的性能比较差,另一方面是commit的时候会执行更新索引等操作。commit一般是当我们认为系统到达一个稳定点的时候,commit一次,类似于流式系统里面的checkpoint。当系统出现故障的时候,Lucene会从最近的一次commit point进行恢复,而不是最近的一次flush。

总结一下,flush会生成segment,之后数据就能被搜索了,是一个轻量级操作,但此时并不保证数据被持久化了。commit是一个比较重的落盘操作,用于持久化,不能被频繁执行。

以上是Lucene,现在我们来看ES。ES中有refresh和flush的概念,其实是和Lucene一一对应的,不过换了个名字。 ES里面的refresh就是Lucene里面的flush;ES里面的flush就是Lucene里面的commit。 所以,ES里面的refresh默认1秒执行一次,也就是数据写入ES之后最快1秒之后可以被搜索到,这也就是ES提供的近实时搜索NRT(Near Realtime)。而flush的执行时机有两个点:一个是ES会根据内存使用情况启发式的执行flush,另外一个时机是当translog达到512MB(默认值)时执行一次flush。网上很多文章(一般都比较早了)都提到每30分钟也会执行一次,但我在6.6版本的代码及文档里面没有找到这部分说明。

这里提到了translog,下面我们来介绍一下translog。

translog

仔细想一下上面介绍的Lucene的flush和commit,就会发现如果在数据还没有被commit之前,机器宕掉了,那上次commit之后到宕机前的数据就丢了。实际上,Lucene也是这么做的,异常恢复时会丢掉最后一次commit之后的数据(如果有的话)。这对于绝大多数业务是不能接受的,所以ES引入了translog来解决这个问题。其实也不算什么新技术,就是类似于传统DB里面的预写日志(WAL),不过在ES里面叫事务日志(transaction log),简称translog。

这样ES的写入的时候,先在内存buffer中进行Lucene documents的写入,写入成功后再写translog。内存buffer中的Lucene documents经过refresh会形成file system cache中的segment,此时,内容就可以被搜索到了。然后经过flush,持久化到磁盘上面。整个流程如下图:

QNBRv2E.png!mobile

这里有几个问题需要说明一下。

  1. 为了保证数据完全的可靠,一般的写入流程都是先写WAL,再写内存,但ES是先写内存buffer,然后再写translog。这个顺序目前没有找到官方的说明,网上大部分说的是写的过程比较复杂,容易出错,先写内存可以降低处理的复杂性。不过这个顺序个人认为对于用户而言其实不是很关键,因为不管先写谁,最终两者都写成功才会返回给客户端。
  2. translog的落盘(即图中的fsync过程)有两种策略,分别对应不同的可靠程度。第一种是每次请求(一个index、update、delete或者bulk操作)都会执行fsync,fsync成功后,才会给客户端返回成功,也就是请求同步刷盘,这种可靠性最高,只要返回成功,那数据一定已经落盘了,这也是默认的方式。第二种是异步的,按照定时达量的方式,默认每5秒或者512MB的时候就fsync一次。异步一般可以获得更高的吞吐量,但弊端是存在数据丢失的风险。
  3. ES的flush(或者Lucene的commit)也是落盘,为什么不直接用,而加一个translog?translog或者所有的WAL的一大特性就是他们都是追加写,这样大多数时候都可以实现顺序读写,读和写可以完全并发,所以性能一般都是非常好的。有一些测试表明,磁盘的顺序写甚至比内存的随机写更快,见 The Pathologies of Big Data 的Figure 3。
  4. translog不是全局的,而是每个shard(也就是Lucene的index)一个,且每个shard的所有translog中同一时刻只会有一个translog文件是可写的,其它都是只读的(如果有的话)。具体细节可查看 Translog 类的Java doc说明。
  5. translog的老化机制在6.0之前是segment flush到磁盘后,就删掉了。6.0之后是按定时达量的策略进行删除,默认是512MB或者12小时。

因为ES的版本更迭比较快,配置项也是经常变更,所以上面没有列出具体的配置项,这里的默认值都来自 ES 6.8 translog 文档,有兴趣的可以点击链接查看。

副本

引入translog之后,解决了进程突然挂掉或者机器突然宕机导致还处于内存,没有被持久化到磁盘的数据丢失的问题,但数据仅落到磁盘还是无法完全保证数据的安全,比如磁盘损坏等。分布式领域解决这个问题最直观和最简单的方式就是采用副本机制,比如HDFS、Kafka等都是此类代表,ES也使用了副本的机制。一个索引由一个primary和n(n≥0)个replica组成,replica的个数支持可以通过接口动态调整。为了可靠,primary和replica的shard不能在同一台机器上面。

这里要注意区分一下replica和shard的关系:比如创建一个索引的时候指定了5个shard,那么此时primary分片就有5个shard,每个replica也会有5个shard。我们可以通过接口随意修改replica的个数,但不能修改每个primary/replica包含5个shard这个事实。 当然,shard的个数也可以通过shrink接口进行调整,但这是一个很重的操作,而且有诸多限制,比如shard个数只能减少,且新个数是原来的因子,比如原来是8个shard,那只能选择调整为4、2或1个;如果原来是5个,那就只能调整为1个了。所以实际中,shard的个数一般要预先计划好(经验值是保证一个shard的大小在30~50GB之间),而replica的个数可以根据实际情况后面再做调整。

在数据写入的时候,数据会先写primary,写成功之后,同时分发给多个replica,待所有replica都返回成功之后,再返回给客户端。所以一个写请求的耗时可以粗略认为是写primary的时间+耗时最长的那个replica的时间。

引申:写入优化

总体来说,ES的写入能力不算太好,所以经常需要对写入性能做优化,除了保证良好的硬件配置外,还可以从ES自身进行机制进行优化,结合上面的介绍,可以很容易得出下面的一些优化手段:

  1. 如果对于搜索的实时性要求不高,可以适当增加refresh的时间,比如从默认的1秒改为30秒或者1min,甚至更长。如果是离线导入再搜索的场景,可以直接设置为"-1",即关闭自动的refresh,等导入完成后,通过接口手动refresh。其提高性能的原理是增加refresh的时间可以减少大量小的segment文件,这样在可以提高flush的效率,减小merge的压力。
  2. 如果对于数据可靠性要求不是特别高,可以将translog的落盘机制由默认的请求同步落盘,改为定时达量的异步落盘,提高落盘的效率。
  3. 如果对于数据可靠性要求不是特别高,可以在写入高峰期先不设置副本,待过了高峰之后再通过接口增加副本。这个可以通过ES的ILM策略,实现自动化。

优化往往都是有代价的,需要根据自己的实际场景进行评估。

引申:写入流程代码分析

本来计划是写一篇从原理、源码分析ES关于写入流程的文章,但发现已经有一些比较好的文章了,就换了个角度,从使用角度宏观介绍了用户一般比较关心的数据可靠性问题。这里推荐两篇介绍ES写入流程的文章:

这两篇文章都结合代码对写入流程进行了深入解析,文章非常不错,这里我再补充一些细节,供有兴趣debug源码的读者阅读。源码阅读环境搭建见我之前的文章 IDEA或Eclipse中编译调试ElasticSearch源码 ,里面使用的ES版本是6.6(6.6.3 snapshot),下面代码的说明也基于这个版本。

ES提供两种类型的接口:对外的REST接口,对内的Transport接口。早一点的版本中,transport接口也开放给外部客户端使用,但现在已经逐步只用于内部节点之间通信了。不过REST接口其实只是在Transport接口进行了封装,请求到ES之后还是会转发到Transport接口对应的处理器去处理。REST和Transport接口的处理器都是在 ActionModule 类中注册的:

static Map<String, ActionHandler<?, ?>> setupActions(List<ActionPlugin> actionPlugins) {
    
    // 省略部分代码

    ActionRegistry actions = new ActionRegistry();

    // Transport接口处理器注册
    actions.register(MainAction.INSTANCE, TransportMainAction.class);
    actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
    actions.register(RemoteInfoAction.INSTANCE, TransportRemoteInfoAction.class);
    actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
    actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class);
    actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
    actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class);
    actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class);
    actions.register(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);

    // 省略部分代码
}


public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
    List<AbstractCatAction> catActions = new ArrayList<>();
    Consumer<RestHandler> registerHandler = a -> {
        if (a instanceof AbstractCatAction) {
            catActions.add((AbstractCatAction) a);
        }
    };

    // REST接口处理器注册
    registerHandler.accept(new RestMainAction(settings, restController));
    registerHandler.accept(new RestNodesInfoAction(settings, restController, settingsFilter));
    registerHandler.accept(new RestRemoteClusterInfoAction(settings, restController));
    registerHandler.accept(new RestNodesStatsAction(settings, restController));
    registerHandler.accept(new RestNodesUsageAction(settings, restController));
    registerHandler.accept(new RestNodesHotThreadsAction(settings, restController));
    registerHandler.accept(new RestClusterAllocationExplainAction(settings, restController));
    registerHandler.accept(new RestClusterStatsAction(settings, restController));
    registerHandler.accept(new RestClusterStateAction(settings, restController, settingsFilter));
    registerHandler.accept(new RestClusterHealthAction(settings, restController));
    // 省略部分代码
}

所以我们要调试某个功能的时候其实只要在这里找到REST接口的处理器以及其对应的Transport接口对应的处理器即可。这里以数据写入为例,ES的写入主要分单条的index和批量的bulk操作,单条其实是bulk的一种特殊情况,所以处理复用的是bulk的逻辑。

写入的REST接口处理器是 RestIndexAction

// 单条
registerHandler.accept(new RestIndexAction(settings, restController));
// 批量
registerHandler.accept(new RestBulkAction(settings, restController));

它们的构造函数里面也明确的定义了我们使用的REST接口:

public RestIndexAction(Settings settings, RestController controller) {
    super(settings);
    controller.registerHandler(POST, "/{index}/{type}", this); // auto id creation
    controller.registerHandler(PUT, "/{index}/{type}/{id}", this);
    controller.registerHandler(POST, "/{index}/{type}/{id}", this);
    CreateHandler createHandler = new CreateHandler(settings);
    controller.registerHandler(PUT, "/{index}/{type}/{id}/_create", createHandler);
    controller.registerHandler(POST, "/{index}/{type}/{id}/_create", createHandler);
}

public RestBulkAction(Settings settings, RestController controller) {
    super(settings);

    controller.registerHandler(POST, "/_bulk", this);
    controller.registerHandler(PUT, "/_bulk", this);
    controller.registerHandler(POST, "/{index}/_bulk", this);
    controller.registerHandler(PUT, "/{index}/_bulk", this);
    controller.registerHandler(POST, "/{index}/{type}/_bulk", this);
    controller.registerHandler(PUT, "/{index}/{type}/_bulk", this);

    this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings);
}

这两个接口最终都会调用 TransportBulkAction ,该类会将bulk中的操作按照shard进行分类,然后交给 TransportBulkAction 执行。这里我以index操作为例列出调用链上的关键类和方法,供参考:

  1. RestIndexAction#prepareRequest :解析参数,构建 IndexRequest ,然后转发给 TransportBulkAction#doExecute 执行
  2. TransportBulkAction

    doExecute
    executeIngestAndBulk
    executeBulk->BulkOperation#doRun
    
  3. ReplicationOperation#execute :写主分片、写副本分片,等待返回。下面给出写主分片的调用,副本分片同理。

    • primaryResult = primary.perform(request); -> TransportReplicationAction#perform -> TransportShardBulkAction#shardOperationOnPrimary -> TransportShardBulkAction#performOnPrimary -> TransportShardBulkAction#executeBulkItemRequest ->

    TransportShardBulkAction#executeIndexRequestOnPrimary ->

    IndexShard#applyIndexOperationOnPrimary -> IndexShard#index ->

    InternalEngine#index

这里看下最后的 InternalEngine#index 部分关键代码:

@Override
public IndexResult index(Index index) throws IOException {    
    
    // 省略部分代码
    
    final IndexResult indexResult;
    if (plan.earlyResultOnPreFlightError.isPresent()) {
        indexResult = plan.earlyResultOnPreFlightError.get();
        assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
    } else if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
        // 这里会调用 Lucene 的接口
        indexResult = indexIntoLucene(index, plan);
    } else {
        indexResult = new IndexResult(
                plan.versionForIndexing, getPrimaryTerm(), plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
    }
    if (index.origin().isFromTranslog() == false) {
        final Translog.Location location;
        if (indexResult.getResultType() == Result.Type.SUCCESS) {
            // Lucene写成功后写 translog
            location = translog.add(new Translog.Index(index, indexResult));
        } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
            // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
            final NoOp noOp = new NoOp(indexResult.getSeqNo(), index.primaryTerm(), index.origin(),
                index.startTime(), indexResult.getFailure().toString());
            location = innerNoOp(noOp).getTranslogLocation();
        } else {
            location = null;
        }
        indexResult.setTranslogLocation(location);
    }
        
    // 省略部分代码
    
}

在indexIntoLucene里面就可以看到熟悉的Lucene接口 IndexWriter 了:

private void addDocs(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
    if (docs.size() > 1) {
        indexWriter.addDocuments(docs);
    } else {
        indexWriter.addDocument(docs.get(0));
    }
    numDocAppends.inc(docs.size());
}


private void updateDocs(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
    if (softDeleteEnabled) {
        if (docs.size() > 1) {
            indexWriter.softUpdateDocuments(uid, docs, softDeletesField);
        } else {
            indexWriter.softUpdateDocument(uid, docs.get(0), softDeletesField);
        }
    } else {
        if (docs.size() > 1) {
            indexWriter.updateDocuments(uid, docs);
        } else {
            indexWriter.updateDocument(uid, docs.get(0));
        }
    }
    numDocUpdates.inc(docs.size());
}

以上就是一些关键代码。

总结

ES的数据可靠性是从两个维度进行保证的:一方面,通过增加translog,解决了因为进程挂掉和机器宕机引发的内存数据丢失的问题,另一方面,通过副本机制解决了单点故障(当然还可以分担查询的压力。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK