4

SOFAJRaft源码阅读(伍)-初识RheaKV - Akai-yuan

 1 year ago
source link: https://www.cnblogs.com/akai-yuan/p/17089993.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

SOFAJRaft的SOFAJRaft-RheaKV 是基于 SOFAJRaft 和 RocksDB 实现的嵌入式、分布式、高可用、强一致的 KV 存储类库。SOFAJRaft-RheaKV 集群主要包括三个核心组件:PD,Store 和 Region。
@Author:Akai-yuan
@更新时间:2023/2/3

1.架构设计

SOFAJRaft-RheaKV 存储类库主要包括PD,Store 和 Region 三个核心组件,支持轻量级的状态/元信息存储以及集群同步,分布式锁服务使用场景

(1)调度

PD 是全局的中心总控节点,负责整个集群的调度管理、Region ID 生成、维护 RegionRouteTable 路由表。一个 PDServer 管理多个集群,集群之间基于 clusterId 隔离;PD Server 需要单独部署,很多场景其实并不需要自管理,RheaKV 也支持不启用 PD,不需要自管理的集群可不启用 PD,设置 PlacementDriverOptions 的 fake选项为 true 即可。PD 一般通过 Region 的心跳返回信息进行对 Region 调度,Region 处理完后,PD 则会在下一个心跳返回中收到 Region 的变更信息来更新路由及状态表。

(2)存储

Store 是集群中的一个物理存储节点,一个 Store 包含一个或多个 Region。通常一个 Node 负责一个 Store,Store 可以被看作是 Region 的容器,里面存储着多个分片数据。Store 会向 PD 主动上报 StoreHeartbeatRequest 心跳,心跳交由 PD 的 handleStoreHeartbeat 处理,里面包含该 Store 的基本信息,比如,包含多少 Region,有哪些 Region 的 Leader 在该 Store 等。

(3)数据

Region 是最小的 KV 数据单元,可理解为一个数据分区或者分片,每个 Region 都有一个左闭右开的区间 [startKey, endKey),能够根据请求流量/负载/数据量大小等指标自动分裂以及自动副本搬迁。Region 有多个副本 Replication 构建 Raft Groups 存储在不同的 Store 节点,通过 Raft 协议日志复制功能数据同步到同 Group 的全部节点。Region对应的是 Store 里某个实际的数据区间。每个 Region 会有多个副本,每个副本存储在不同的 Store,一起组成一个Raft Group。Region 中的 Leader 会向 PD 主动上报 RegionHeartbeatRequest 心跳,交由 PD 的 handleRegionHeartbeat 处理,而 PD 是通过 Region的Epoch 感知 Region 是否有变化。
为了让大家更清楚PD,Store 和 Region 三个核心组件的功能,这里放一张官方图片以便于理解:

2784327-20230203212602133-1742689353.jpg

2.初始化

我们从JRaft-Example模块的RheaKV部分开始,首先看到com.alipay.sofa.jraft.example.rheakv.Server1的main方法。

  1. 声明了PlacementDriverOptions、StoreEngineOptions两个配置选项实体
  2. 定义了RheaKVStoreOptions,并将PDOptions和SEOptions装配到属性中,并初始化声明集群名、是否开启并行压缩、服务的IP:端口列表("127.0.0.1:8181,127.0.0.1:8182,127.0.0.1:8183")。
  3. 声明一个Node节点
  4. 添加一个钩子函数,实现优雅停机。(作者曾经分析过钩子函数的作用,具体参照:SOFAJRaft源码阅读(叁)-ShutdownHook如何优雅的停机
public static void main(final String[] args) {
    final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured()
        .withFake(true) // use a fake pd
        .config();
    final StoreEngineOptions storeOpts = StoreEngineOptionsConfigured.newConfigured() //
        //StoreEngine 存储引擎支持 MemoryDB 和 RocksDB 两种实现
        .withStorageType(StorageType.RocksDB)
        .withRocksDBOptions(RocksDBOptionsConfigured.newConfigured().withDbPath(Configs.DB_PATH).config())
        .withRaftDataPath(Configs.RAFT_DATA_PATH)
        .withServerAddress(new Endpoint("127.0.0.1", 8181))
        .config();
    final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured() //
        .withClusterName(Configs.CLUSTER_NAME) //
        .withUseParallelCompress(true) //
        .withInitialServerList(Configs.ALL_NODE_ADDRESSES)
        .withStoreEngineOptions(storeOpts) //
        .withPlacementDriverOptions(pdOpts) //
        .config();
    System.out.println(opts);
    final Node node = new Node(opts);
    node.start();
    Runtime.getRuntime().addShutdownHook(new Thread(node::stop));
    System.out.println("server1 start OK");
}

关于Node节点的实现:
里面维护了一个RheaKVStoreOptions、RheaKVStore。

public class Node {
    private final RheaKVStoreOptions options;
    private RheaKVStore              rheaKVStore;
    public Node(RheaKVStoreOptions options) {
        this.options = options;
    }
    public void start() {
        this.rheaKVStore = new DefaultRheaKVStore();
        this.rheaKVStore.init(this.options);
    }
    public void stop() {
        this.rheaKVStore.shutdown();
    }
    public RheaKVStore getRheaKVStore() {
        return rheaKVStore;
    }
}

(1)DefaultRheaKVStore的初始化

可以看到调用了DefaultRheaKVStore,还调用了他的init方法进行初始化。

public synchronized boolean init(final RheaKVStoreOptions opts) {
    	//判断是否已经启动   
    	if (this.started) {
            LOG.info("[DefaultRheaKVStore] already started.");
            return true;
        }
        DescriberManager.getInstance().addDescriber(RouteTable.getInstance());
        this.opts = opts;
        //根据PlacementDriverOptions初始化PD
        final PlacementDriverOptions pdOpts = opts.getPlacementDriverOptions();
        final String clusterName = opts.getClusterName();
        Requires.requireNonNull(pdOpts, "opts.placementDriverOptions");
        Requires.requireNonNull(clusterName, "opts.clusterName");
        if (Strings.isBlank(pdOpts.getInitialServerList())) {
            // 如果为空,则继承父级的值
            pdOpts.setInitialServerList(opts.getInitialServerList());
        }
        //这里不启用 PD,就实例化一个FakePlacementDriverClient
        if (pdOpts.isFake()) {
            this.pdClient = new FakePlacementDriverClient(opts.getClusterId(), clusterName);
        //启用 PD,就实例化一个RemotePlacementDriverClient
        } else {
            this.pdClient = new RemotePlacementDriverClient(opts.getClusterId(), clusterName);
        }
    	//初始化FakePlacementDriverClient/RemotePlacementDriverClient
        if (!this.pdClient.init(pdOpts)) {
            LOG.error("Fail to init [PlacementDriverClient].");
            return false;
        }
        // 初始化压缩策略
        ZipStrategyManager.init(opts);
        // 初始化存储引擎
        final StoreEngineOptions stOpts = opts.getStoreEngineOptions();
        if (stOpts != null) {
            stOpts.setInitialServerList(opts.getInitialServerList());
            this.storeEngine = new StoreEngine(this.pdClient, this.stateListenerContainer);
            if (!this.storeEngine.init(stOpts)) {
                LOG.error("Fail to init [StoreEngine].");
                return false;
            }
        }
   		//获取当前节点的ip和端口号
        final Endpoint selfEndpoint = this.storeEngine == null ? null : this.storeEngine.getSelfEndpoint();
        final RpcOptions rpcOpts = opts.getRpcOptions();
        Requires.requireNonNull(rpcOpts, "opts.rpcOptions");
        //初始化一个RpcService,并重写getLeader方法
    	this.rheaKVRpcService = new DefaultRheaKVRpcService(this.pdClient, selfEndpoint) {
            @Override
            public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {
                final Endpoint leader = getLeaderByRegionEngine(regionId);
                if (leader != null) {
                    return leader;
                }
                return super.getLeader(regionId, forceRefresh, timeoutMillis);
            }
        };
        if (!this.rheaKVRpcService.init(rpcOpts)) {
            LOG.error("Fail to init [RheaKVRpcService].");
            return false;
        }
        //获取重试次数,默认重试两次
        this.failoverRetries = opts.getFailoverRetries();
        //默认5000
        this.futureTimeoutMillis = opts.getFutureTimeoutMillis();
        //是否只从leader读取数据,默认为true
        this.onlyLeaderRead = opts.isOnlyLeaderRead();
        //初始化kvDispatcher, 这里默认为true
        if (opts.isUseParallelKVExecutor()) {
            final int numWorkers = Utils.cpus();
            //乘以16
            final int bufSize = numWorkers << 4;
            final String name = "parallel-kv-executor";
            final ThreadFactory threadFactory = Constants.THREAD_AFFINITY_ENABLED
                    ? new AffinityNamedThreadFactory(name, true) : new NamedThreadFactory(name, true);
            //初始化Dispatcher
            this.kvDispatcher = new TaskDispatcher(bufSize, numWorkers, WaitStrategyType.LITE_BLOCKING_WAIT, threadFactory);
        }
        this.batchingOpts = opts.getBatchingOptions();
        //默认是true
        if (this.batchingOpts.isAllowBatching()) {
            this.getBatching = new GetBatching(KeyEvent::new, "get_batching",
                    new GetBatchingHandler("get", false));
            this.getBatchingOnlySafe = new GetBatching(KeyEvent::new, "get_batching_only_safe",
                    new GetBatchingHandler("get_only_safe", true));
            this.putBatching = new PutBatching(KVEvent::new, "put_batching",
                    new PutBatchingHandler("put"));
        }
        LOG.info("[DefaultRheaKVStore] start successfully, options: {}.", opts);
        return this.started = true;
    }

(2)StoreEngine初始化

其中有些代码操作与DefaultRheaKVStore的init方法中的某些代码一致,就不再重复赘述,其余的用注释的方式标注在以下代码块中。

public synchronized boolean init(final StoreEngineOptions opts) {
        if (this.started) {
            LOG.info("[StoreEngine] already started.");
            return true;
        }
        DescriberManager.getInstance().addDescriber(this);
        this.storeOpts = Requires.requireNonNull(opts, "opts");
        Endpoint serverAddress = Requires.requireNonNull(opts.getServerAddress(), "opts.serverAddress");
        //获取ip和端口
    	final int port = serverAddress.getPort();
        final String ip = serverAddress.getIp();
        //如果传入的IP为空,那么就设置启动机器ip作为serverAddress的ip
        if (ip == null || Utils.IP_ANY.equals(ip)) {
            serverAddress = new Endpoint(NetUtil.getLocalCanonicalHostName(), port);
            opts.setServerAddress(serverAddress);
        }
        //获取度量上报时间
        final long metricsReportPeriod = opts.getMetricsReportPeriod();
        // 初始化RegionEngineOptions
        List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
        //如果RegionEngineOptions为空,则初始化一个
        if (rOptsList == null || rOptsList.isEmpty()) {
            // -1 region
            final RegionEngineOptions rOpts = new RegionEngineOptions();
            rOpts.setRegionId(Constants.DEFAULT_REGION_ID);
            rOptsList = Lists.newArrayList();
            rOptsList.add(rOpts);
            opts.setRegionEngineOptionsList(rOptsList);
        }
        //获取集群名
        final String clusterName = this.pdClient.getClusterName();
        //遍历rOptsList集合,为其中的RegionEngineOptions对象设置参数
        for (final RegionEngineOptions rOpts : rOptsList) {
            //用集群名+“-”+RegionId 拼接设置为RaftGroupId
            rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId()));
            rOpts.setServerAddress(serverAddress);
            if (Strings.isBlank(rOpts.getInitialServerList())) {
                // if blank, extends parent's value
                rOpts.setInitialServerList(opts.getInitialServerList());
            }
            if (rOpts.getNodeOptions() == null) {
                // copy common node options
                rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts
                    .getCommonNodeOptions().copy());
            }
            //如果原本没有设置度量上报时间,那么就重置一下
            if (rOpts.getMetricsReportPeriod() <= 0 && metricsReportPeriod > 0) {
                // extends store opts
                rOpts.setMetricsReportPeriod(metricsReportPeriod);
            }
        }
        // 初始化Store和Store里面的region
        final Store store = this.pdClient.getStoreMetadata(opts);
        if (store == null || store.getRegions() == null || store.getRegions().isEmpty()) {
            LOG.error("Empty store metadata: {}.", store);
            return false;
        }
        this.storeId = store.getId();
        this.partRocksDBOptions = SystemPropertyUtil.getBoolean(PART_ROCKSDB_OPTIONS_KEY, false);
        // 初始化执行器
        if (this.readIndexExecutor == null) {
            this.readIndexExecutor = StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads());
        }
        if (this.raftStateTrigger == null) {
            this.raftStateTrigger = StoreEngineHelper.createRaftStateTrigger(opts.getLeaderStateTriggerCoreThreads());
        }
        if (this.snapshotExecutor == null) {
            this.snapshotExecutor = StoreEngineHelper.createSnapshotExecutor(opts.getSnapshotCoreThreads(),
                opts.getSnapshotMaxThreads());
        }
        // init rpc executors
        final boolean useSharedRpcExecutor = opts.isUseSharedRpcExecutor();
        // 初始化rpc远程执行器,用来执行RPCServer的Processors
        if (!useSharedRpcExecutor) {
            if (this.cliRpcExecutor == null) {
                this.cliRpcExecutor = StoreEngineHelper.createCliRpcExecutor(opts.getCliRpcCoreThreads());
            }
            if (this.raftRpcExecutor == null) {
                this.raftRpcExecutor = StoreEngineHelper.createRaftRpcExecutor(opts.getRaftRpcCoreThreads());
            }
            if (this.kvRpcExecutor == null) {
                this.kvRpcExecutor = StoreEngineHelper.createKvRpcExecutor(opts.getKvRpcCoreThreads());
            }
        }
        // 初始化指标度量
        startMetricReporters(metricsReportPeriod);
        // 初始化rpcServer,供其他服务调用
        this.rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverAddress, this.raftRpcExecutor,
            this.cliRpcExecutor);
    	//为server加入各种processor
        StoreEngineHelper.addKvStoreRequestProcessor(this.rpcServer, this);
        if (!this.rpcServer.init(null)) {
            LOG.error("Fail to init [RpcServer].");
            return false;
        }
        // init db store
    	// 根据不同的类型选择db
        if (!initRawKVStore(opts)) {
            return false;
        }
        if (this.rawKVStore instanceof Describer) {
            DescriberManager.getInstance().addDescriber((Describer) this.rawKVStore);
        }
        // init all region engine
    	// 为每个region初始化RegionEngine
        if (!initAllRegionEngine(opts, store)) {
            LOG.error("Fail to init all [RegionEngine].");
            return false;
        }
        // heartbeat sender
        // 如果开启了自管理的集群,那么需要初始化心跳发送器
        if (this.pdClient instanceof RemotePlacementDriverClient) {
            HeartbeatOptions heartbeatOpts = opts.getHeartbeatOptions();
            if (heartbeatOpts == null) {
                heartbeatOpts = new HeartbeatOptions();
            }
            this.heartbeatSender = new HeartbeatSender(this);
            if (!this.heartbeatSender.init(heartbeatOpts)) {
                LOG.error("Fail to init [HeartbeatSender].");
                return false;
            }
        }
        this.startTime = System.currentTimeMillis();
        LOG.info("[StoreEngine] start successfully: {}.", this);
        return this.started = true;
    }

(3)RegionEngineOptions初始化

在上述StoreEngine的初始化中,可以看到有对regionEngine进行初始化,接下来我们单独再拿出该部分代码进行分析。

  1. 首先对opts.getRegionEngineOptionsList()判空,若是空则初始化一个RegionEngineOptions()
  2. 遍历新生成的每一个RegionEngineOptions,并设置参数(RaftGroupId、ServerAddress、InitialServerList、NodeOptions、MetricsReportPeriod)
        // init region options
        List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
        if (rOptsList == null || rOptsList.isEmpty()) {
            // -1 region
            final RegionEngineOptions rOpts = new RegionEngineOptions();
            rOpts.setRegionId(Constants.DEFAULT_REGION_ID);
            rOptsList = Lists.newArrayList();
            rOptsList.add(rOpts);
            opts.setRegionEngineOptionsList(rOptsList);
        }
        final String clusterName = this.pdClient.getClusterName();
        for (final RegionEngineOptions rOpts : rOptsList) {
            rOpts.setRaftGroupId(JRaftHelper.getJRaftGroupId(clusterName, rOpts.getRegionId()));
            rOpts.setServerAddress(serverAddress);
            if (Strings.isBlank(rOpts.getInitialServerList())) {
                // if blank, extends parent's value
                rOpts.setInitialServerList(opts.getInitialServerList());
            }
            if (rOpts.getNodeOptions() == null) {
                // copy common node options
                rOpts.setNodeOptions(opts.getCommonNodeOptions() == null ? new NodeOptions() : opts
                    .getCommonNodeOptions().copy());
            }
            if (rOpts.getMetricsReportPeriod() <= 0 && metricsReportPeriod > 0) {
                // extends store opts
                rOpts.setMetricsReportPeriod(metricsReportPeriod);
            }
        }

(4)Store初始化

调用pdClient的getStoreMetadata方法进行初始化:

final Store store = this.pdClient.getStoreMetadata(opts);

当调用FakePlacementDriverClient#getStoreMetadata时:

  1. 获取之前初始化得到的RegionEngineOptions链表
  2. 构造一个与RegionEngineOptions链表相同大小的Region链表,因为一个Store里面会有多个region,之前在那张图中能直观看到。
  3. RegionEngineOptions链表中的每个元素执行getLocalRegionMetadata方法,并将结果添加到region链表中。
    public Store getStoreMetadata(final StoreEngineOptions opts) {
        final Store store = new Store();
        final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
        final List<Region> regionList = Lists.newArrayListWithCapacity(rOptsList.size());
        store.setId(-1);
        store.setEndpoint(opts.getServerAddress());
        for (final RegionEngineOptions rOpts : rOptsList) {
            regionList.add(getLocalRegionMetadata(rOpts));
        }
        store.setRegions(regionList);
        return store;
    }

我们来看AbstractPlacementDriverClient#getLocalRegionMetadata方法:

  1. 保证regionId在合理的范围内
  2. 设置key的范围(左闭右开
  3. 根据initialServerList转换成peer对象
  4. 将Region添加到regionRouteTable路由表
    protected Region getLocalRegionMetadata(final RegionEngineOptions opts) {
        final long regionId = Requires.requireNonNull(opts.getRegionId(), "opts.regionId");
        Requires.requireTrue(regionId >= Region.MIN_ID_WITH_MANUAL_CONF, "opts.regionId must >= "
                                                                         + Region.MIN_ID_WITH_MANUAL_CONF);
        Requires.requireTrue(regionId < Region.MAX_ID_WITH_MANUAL_CONF, "opts.regionId must < "
                                                                        + Region.MAX_ID_WITH_MANUAL_CONF);
        final byte[] startKey = opts.getStartKeyBytes();
        final byte[] endKey = opts.getEndKeyBytes();
        final String initialServerList = opts.getInitialServerList();
        final Region region = new Region();
        final Configuration conf = new Configuration();
        // region
        region.setId(regionId);
        region.setStartKey(startKey);
        region.setEndKey(endKey);
        region.setRegionEpoch(new RegionEpoch(-1, -1));
        // peers
        Requires.requireTrue(Strings.isNotBlank(initialServerList), "opts.initialServerList is blank");
        conf.parse(initialServerList);
        region.setPeers(JRaftHelper.toPeerList(conf.listPeers()));
        this.regionRouteTable.addOrUpdateRegion(region);
        return region;
    }

RegionEpoch涉及到两个版本号:
(1)confVer:Conf 变化的版本号, 当增加或者移除一个peer时,版本号自增
(2)version:Region 版本号, 分裂或合并时,版本号自增

    public RegionEpoch(long confVer, long version) {
        this.confVer = confVer
        this.version = version;
    }

关于RegionRouteTable#addOrUpdateRegion:

    public void addOrUpdateRegion(final Region region) {
        Requires.requireNonNull(region, "region");
        Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");
        final long regionId = region.getId();
        final byte[] startKey = BytesUtil.nullToEmpty(region.getStartKey());
        final StampedLock stampedLock = this.stampedLock;
        final long stamp = stampedLock.writeLock();
        try {
            this.regionTable.put(regionId, region.copy());
            this.rangeTable.put(startKey, regionId);
        } finally {
            stampedLock.unlockWrite(stamp);
        }
    }

我们看看RegionRouteTable的几个字段:

  • keyBytesComparator:是一个LexicographicByteArrayComparator字典序比较器
  • stampedLock:比读写锁性能更高的锁
  • rangeTable:是一个TreeMap,它实现了NavigableMap,并按照指定的keyBytesComparator排序,键值对为<regionId, region>
  • regionTable:是一个hashMap,键值对为<startKey, regionId>
    private static final Comparator<byte[]>  keyBytesComparator = BytesUtil.getDefaultByteArrayComparator();
    private final StampedLock                stampedLock        = new StampedLock();
    private final NavigableMap<byte[], Long> rangeTable         = new TreeMap<>(keyBytesComparator);
    private final Map<Long, Region>          regionTable        = Maps.newHashMap();

(5)RegionEngine初始化

StoreEngine#initAllRegionEngine

    private boolean initAllRegionEngine(final StoreEngineOptions opts, final Store store) {
        Requires.requireNonNull(opts, "opts");
        Requires.requireNonNull(store, "store");
        //获取主目录
        String baseRaftDataPath = opts.getRaftDataPath();
        if (Strings.isNotBlank(baseRaftDataPath)) {
            try {
                FileUtils.forceMkdir(new File(baseRaftDataPath));
            } catch (final Throwable t) {
                LOG.error("Fail to make dir for raftDataPath: {}.", baseRaftDataPath);
                return false;
            }
        } else {
            baseRaftDataPath = "";
        }
        final Endpoint serverAddress = opts.getServerAddress();
        //获取RegionEngineOptions和region
        final List<RegionEngineOptions> rOptsList = opts.getRegionEngineOptionsList();
        final List<Region> regionList = store.getRegions();
        Requires.requireTrue(rOptsList.size() == regionList.size());
        for (int i = 0; i < rOptsList.size(); i++) {
            final RegionEngineOptions rOpts = rOptsList.get(i);
            if (!inConfiguration(rOpts.getServerAddress().toString(), rOpts.getInitialServerList())) {
                continue;
            }
            final Region region = regionList.get(i);
            //检验region路径是否为空,为空则重新设值
            if (Strings.isBlank(rOpts.getRaftDataPath())) {
                final String childPath = "raft_data_region_" + region.getId() + "_" + serverAddress.getPort();
                rOpts.setRaftDataPath(Paths.get(baseRaftDataPath, childPath).toString());
            }
            Requires.requireNonNull(region.getRegionEpoch(), "regionEpoch");
            //根据Region初始化RegionEngine
            final RegionEngine engine = new RegionEngine(region, this);
            if (engine.init(rOpts)) {
                // 每个 RegionKVService 对应一个 Region,只处理本身 Region 范畴内的请求
                final RegionKVService regionKVService = new DefaultRegionKVService(engine);
                registerRegionKVService(regionKVService);
                //放入到ConcurrentMap<Long, RegionEngine> regionKVServiceTable 中
                this.regionEngineTable.put(region.getId(), engine);
            } else {
                LOG.error("Fail to init [RegionEngine: {}].", region);
                return false;
            }
        }
        return true;
    }
    public synchronized boolean init(final RegionEngineOptions opts) {
        if (this.started) {
            LOG.info("[RegionEngine: {}] already started.", this.region);
            return true;
        }
        this.regionOpts = Requires.requireNonNull(opts, "opts");
        //实例化状态机
        this.fsm = new KVStoreStateMachine(this.region, this.storeEngine);

        // node options
        NodeOptions nodeOpts = opts.getNodeOptions();
        if (nodeOpts == null) {
            nodeOpts = new NodeOptions();
        }
        //如果度量间隔时间大于零,那么开启度量
        final long metricsReportPeriod = opts.getMetricsReportPeriod();
        if (metricsReportPeriod > 0) {
            // metricsReportPeriod > 0 means enable metrics
            nodeOpts.setEnableMetrics(true);
        }
        final Configuration initialConf = new Configuration();
        if (!initialConf.parse(opts.getInitialServerList())) {
            LOG.error("Fail to parse initial configuration {}.", opts.getInitialServerList());
            return false;
        }
        //初始化集群配置
        nodeOpts.setInitialConf(initialConf);
        nodeOpts.setFsm(this.fsm);
        //初始化各种日志的路径
        final String raftDataPath = opts.getRaftDataPath();
        try {
            FileUtils.forceMkdir(new File(raftDataPath));
        } catch (final Throwable t) {
            LOG.error("Fail to make dir for raftDataPath {}.", raftDataPath);
            return false;
        }
        if (Strings.isBlank(nodeOpts.getLogUri())) {
            final Path logUri = Paths.get(raftDataPath, "log");
            nodeOpts.setLogUri(logUri.toString());
        }
        if (Strings.isBlank(nodeOpts.getRaftMetaUri())) {
            final Path meteUri = Paths.get(raftDataPath, "meta");
            nodeOpts.setRaftMetaUri(meteUri.toString());
        }
        if (Strings.isBlank(nodeOpts.getSnapshotUri())) {
            final Path snapshotUri = Paths.get(raftDataPath, "snapshot");
            nodeOpts.setSnapshotUri(snapshotUri.toString());
        }
        LOG.info("[RegionEngine: {}], log uri: {}, raft meta uri: {}, snapshot uri: {}.", this.region,
            nodeOpts.getLogUri(), nodeOpts.getRaftMetaUri(), nodeOpts.getSnapshotUri());
        final Endpoint serverAddress = opts.getServerAddress();
        final PeerId serverId = new PeerId(serverAddress, 0);
        final RpcServer rpcServer = this.storeEngine.getRpcServer();
        this.raftGroupService = new RaftGroupService(opts.getRaftGroupId(), serverId, nodeOpts, rpcServer, true);
        this.node = this.raftGroupService.start(false);
        //初始化node节点
        RouteTable.getInstance().updateConfiguration(this.raftGroupService.getGroupId(), nodeOpts.getInitialConf());
        if (this.node != null) {
            final RawKVStore rawKVStore = this.storeEngine.getRawKVStore();
            final Executor readIndexExecutor = this.storeEngine.getReadIndexExecutor();
            //RaftRawKVStore 是 RheaKV 基于 Raft 复制状态机 KVStoreStateMachine 的 RawKVStore 接口 KV 存储实现
        	//RheaKV 的 Raft 入口,从这里开始 Raft 流程
            this.raftRawKVStore = new RaftRawKVStore(this.node, rawKVStore, readIndexExecutor);
            //拦截请求做指标度量
            this.metricsRawKVStore = new MetricsRawKVStore(this.region.getId(), this.raftRawKVStore);
            // metrics config
            if (this.regionMetricsReporter == null && metricsReportPeriod > 0) {
                final MetricRegistry metricRegistry = this.node.getNodeMetrics().getMetricRegistry();
                if (metricRegistry != null) {
                    final ScheduledExecutorService scheduler = this.storeEngine.getMetricsScheduler();
                    // start raft node metrics reporter
                    this.regionMetricsReporter = Slf4jReporter.forRegistry(metricRegistry) //
                        .prefixedWith("region_" + this.region.getId()) //
                        .withLoggingLevel(Slf4jReporter.LoggingLevel.INFO) //
                        .outputTo(LOG) //
                        .scheduleOn(scheduler) //
                        .shutdownExecutorOnStop(scheduler != null) //
                        .build();
                    this.regionMetricsReporter.start(metricsReportPeriod, TimeUnit.SECONDS);
                }
            }
            this.started = true;
            LOG.info("[RegionEngine] start successfully: {}.", this);
        }
        return this.started;
    }

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK