40

SOFAJRaft—初次使用

 5 years ago
source link: https://www.tuicool.com/articles/aMrIfmq
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

SOFAJRaft 是基于 Raft 算法的生产级高性能 Java 实现,支持 MULTI-RAFT-GROUP。应用场景有 Leader 选举、分布式锁服务、高可靠的元信息管理、分布式存储系统。

如果不了解Raft算法的朋友可以去看看这篇文章: Raft 为什么是更易理解的分布式一致性算法 ,写的很详细了。

7VbmUn7.png!web

这张图是SOFAJRaft的设计图,其中Node 代表了一个 SOFAJRaft Server 节点。

由于SOFAJRaft的Node节点是一个分布式的结构,所以Node节点需要将信息传递给其他Node,所以Replicator的作用就是用来复制信息给其他的Node。多个Replicator共同组成一个ReplicatorGroup。

Snapshot是表示一个快照,就是对数据当前值的一个记录,会存盘保存,提供冷备数据功能。

Leader 生成快照有这么几个作用:

  • 当有新的 Node 加入集群的时候,不用只靠日志复制、回放去和 Leader 保持数据一致,而是通过安装 Leader 的快照来跳过早期大量日志的回放;
  • Leader 用快照替代 Log 复制可以减少网络上的数据量;
  • 用快照替代早期的 Log 可以节省存储空间;

StateMachine 接口是用来给用户去实现的部分。通过用户实现具体的业务逻辑从而在分布式系统中达成共识。

在 StateMachine 上,我们要去实现状态机暴露给我们待实现的几个接口,最重要的是 onApply 接口,要在这个接口里将 Cilent 的请求指令进行运算,转换成具体的计数器值。而 onSnapshotSave 和 onSnapshotLoad 接口则是负责快照的生成和加载。

Client也是需要用户去实现的部分,用户需要去定义不同的消息类型和客户端的处理逻辑。

实现Counter分布式计数器

下面我们给出个需求: 提供一个 Counter,Client 每次计数时可以指定步幅,也可以随时发起查询。

将它翻译成具体的功能点,主要有三部分:

  1. 实现:Counter server,具备计数功能,具体运算公式为:Cn = Cn-1 + delta;
  2. 提供写服务,写入 delta 触发计数器运算;
  3. 提供读服务,读取当前 Cn 值;

具体代码: Counter

在这个demo中,我们启动三个server作为一个group,传入下面的参数:

/tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
/tmp/server2 counter 127.0.0.1:8082 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
/tmp/server3 counter 127.0.0.1:8083 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083

表示使用/tmp/server1 ,/tmp/server2,/tmp/server3三个目录用来存储数据,raft group名称为 counter,节点ip也分别为

127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083

然后启动客户端,并传入下面参数:

counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083

表示绑定的raft group名称为 counter,集群为:

127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083

服务端

CounterServer

public CounterServer(final String dataPath, final String groupId, final PeerId serverId,
                     final NodeOptions nodeOptions) throws IOException {
    // 初始化路径
    FileUtils.forceMkdir(new File(dataPath));

    // 这里让 raft RPC 和业务 RPC 使用同一个 RPC server, 通常也可以分开
    final RpcServer rpcServer = new RpcServer(serverId.getPort());
    RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
    // 注册业务处理器
    rpcServer.registerUserProcessor(new GetValueRequestProcessor(this));
    rpcServer.registerUserProcessor(new IncrementAndGetRequestProcessor(this));
    // 初始化状态机
    this.fsm = new CounterStateMachine();
    // 设置状态机到启动参数
    nodeOptions.setFsm(this.fsm);
    // 设置存储路径
    // 日志, 必须
    nodeOptions.setLogUri(dataPath + File.separator + "log");
    // 元信息, 必须
    nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta");
    // snapshot, 可选, 一般都推荐
    nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot");
    // 初始化 raft group 服务框架
    this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
    // 启动
    this.node = this.raftGroupService.start();
}

服务端CounterServer在实例化的时候会设置相应的处理器,这里设置了GetValueRequestProcessor和 IncrementAndGetRequestProcessor。

GetValueRequestProcessor用来提供读服务,读取当前 Cn 值;

IncrementAndGetRequestProcessor提供写服务,写入 delta 触发计数器运算;

GetValueRequestProcessor

@Override
public Object handleRequest(final BizContext bizCtx, final GetValueRequest request) throws Exception {
    if (!this.counterServer.getFsm().isLeader()) {
        return this.counterServer.redirect();
    }

    final ValueResponse response = new ValueResponse();
    response.setSuccess(true);
    response.setValue(this.counterServer.getFsm().getValue());
    return response;
}

GetValueRequestProcessor的处理非常的简单,直接获取状态机的值然后返回。

IncrementAndGetRequestProcessor

public void handleRequest(final BizContext bizCtx, final AsyncContext asyncCtx,
                          final IncrementAndGetRequest request) {
    //判断当前节点是否是leader
    if (!this.counterServer.getFsm().isLeader()) {
        asyncCtx.sendResponse(this.counterServer.redirect());
        return;
    }
    //设置响应数据
    final ValueResponse response = new ValueResponse();
    //封装请求数据,并回调响应结果
    final IncrementAndAddClosure closure = new IncrementAndAddClosure(counterServer, request, response,
            status -> {
                //响应成功
                if (!status.isOk()) {
                    response.setErrorMsg(status.getErrorMsg());
                    response.setSuccess(false);
                }
                //发送响应请求
                asyncCtx.sendResponse(response);
            });

    try {
        final Task task = new Task();
        task.setDone(closure);
        //序列化请求
        task.setData(ByteBuffer
                .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2).serialize(request)));
        //调用node处理请求
        // apply task to raft group.
        counterServer.getNode().apply(task);
    } catch (final CodecException e) {
        LOG.error("Fail to encode IncrementAndGetRequest", e);
        //请求失败,则立即响应
        response.setSuccess(false);
        response.setErrorMsg(e.getMessage());
        asyncCtx.sendResponse(response);
    }
}

这里使用IncrementAndAddClosure来封装响应和请求,并通过回调的方式进行异步回写数据到client。然后实例化Task实例,序列化请求数据,调用node的apply方法。

然后设置了CounterStateMachine状态机,并设值了日志,元信息和快照的存储路径。

CounterStateMachine实现了StateMachineAdapter抽象类,并重写了3个方法:

onApply用来处理具体的业务

onSnapshotSave保存快照

onSnapshotLoad加载快照

在保存和加载快照的地方使用了CounterSnapshotFile类来进行辅助。

CounterStateMachine

public class CounterStateMachine extends StateMachineAdapter {
    ...
    private final AtomicLong    value      = new AtomicLong(0);
    
    public void onApply(final Iterator iter) {
        //获取processor中封装的数据
        while (iter.hasNext()) {
            long delta = 0;
    
            //用于封装请求数据和回调结果
            IncrementAndAddClosure closure = null;
            if (iter.done() != null) {
                // This task is applied by this node, get value from closure to avoid additional parsing.
                closure = (IncrementAndAddClosure) iter.done();
                delta = closure.getRequest().getDelta();
            } else {
                // Have to parse FetchAddRequest from this user log.
                final ByteBuffer data = iter.getData();
                try {
                    final IncrementAndGetRequest request = SerializerManager.getSerializer(SerializerManager.Hessian2)
                            .deserialize(data.array(), IncrementAndGetRequest.class.getName());
                    delta = request.getDelta();
                } catch (final CodecException e) {
                    LOG.error("Fail to decode IncrementAndGetRequest", e);
                }
            }
            //获取当前值
            final long prev = this.value.get();
            //将当前值加上delta
            final long updated = value.addAndGet(delta);
            //设置响应,并调用run方法回写响应方法
            if (closure != null) {
                closure.getResponse().setValue(updated);
                closure.getResponse().setSuccess(true);
                closure.run(Status.OK());
            }
            LOG.info("Added value={} by delta={} at logIndex={}", prev, delta, iter.getIndex());
            iter.next();
        }
    }
}

这里的onApply方法首先会获取processor中封装的数据,然后获取processor中传入的closure实例,然后处理好业务逻辑后调用closure的run进行回调返回数据到客户端。

客户端

CounterClient

public static void main(final String[] args) throws Exception {
    if (args.length != 2) {
        System.out.println("Useage : java com.alipay.sofa.jraft.example.counter.CounterClient {groupId} {conf}");
        System.out
            .println("Example: java com.alipay.sofa.jraft.example.counter.CounterClient counter 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083");
        System.exit(1);
    }
    final String groupId = args[0];
    final String confStr = args[1];

    final Configuration conf = new Configuration();
    if (!conf.parse(confStr)) {
        throw new IllegalArgumentException("Fail to parse conf:" + confStr);
    }
    // 更新raft group配置
    RouteTable.getInstance().updateConfiguration(groupId, conf);
    //接下来初始化 RPC 客户端并更新路由表
    final BoltCliClientService cliClientService = new BoltCliClientService();
    cliClientService.init(new CliOptions());

    if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 1000).isOk()) {
        throw new IllegalStateException("Refresh leader failed");
    }
    //获取 leader 后发送请求
    final PeerId leader = RouteTable.getInstance().selectLeader(groupId);
    System.out.println("Leader is " + leader);
    final int n = 1000;
    final CountDownLatch latch = new CountDownLatch(n);
    final long start = System.currentTimeMillis();
    for (int i = 0; i < n; i++) {
        incrementAndGet(cliClientService, leader, i, latch);
    }
    latch.await();
    System.out.println(n + " ops, cost : " + (System.currentTimeMillis() - start) + " ms.");
    System.exit(0);
}

客户端先是根据groupId和IP绑定server,然后更新路由表,获取leader

private static void incrementAndGet(final BoltCliClientService cliClientService, final PeerId leader,
                                    final long delta, CountDownLatch latch) throws RemotingException,
                                                                           InterruptedException {
    final IncrementAndGetRequest request = new IncrementAndGetRequest();
    request.setDelta(delta);
    cliClientService.getRpcClient().invokeWithCallback(leader.getEndpoint().toString(), request,
        new InvokeCallback() {

            @Override
            public void onResponse(Object result) {
                latch.countDown();
                System.out.println("incrementAndGet result:" + result);
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
                latch.countDown();

            }

            @Override
            public Executor getExecutor() {
                return null;
            }
        }, 5000);
}

然后调用incrementAndGet方法。incrementAndGet方法中使用cliClientService获取client然后传入request请求并设值回调函数。

总体流程

这里总结一下整个server和client的调用流程

77zEJbB.png!web

首先是CounterClient绑定server后,获取server的leader节点,然后发送一个IncrementAndGetRequest的request请求到server。

Server接收到请求后根据请求的类型交给IncrementAndGetRequestProcessor处理,并调用handleRequest方法。

然后handleRequest会将数据封装调用状态机的onApply方法,处理业务数据后调用closure进行回调。

closure回调后会封装一个ValueResponse发送响应请求给客户端。

客户端会回调onResponse方法。

到这里整个counter的例子就讲解完毕了


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK