12

Spring-data-redis + Lettuce 如何使用 Pipeline

 2 years ago
source link: https://segmentfault.com/a/1190000040822290
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Spring-data-redis + Lettuce 如何使用 Pipeline

发布于 51 分钟前

关于 spring-data-redis 和 lettuce,笔者写过不少文章:

最近,私信还有留言中,网友提到 spring-data-redis 和 lettuce 一起使用,pipeline 通过抓包一看,并没有生效,这个如何配置才能生效呢?

首先,在上面的文章中,我们分析过 Spring-data-redis + Lettuce 的基本原理,在这种环境下 RedisTemplate 使用的连接内部包括:

  • asyncSharedConn:可以为空,如果开启了连接共享,则不为空,默认是开启的;所有 LettuceConnection 共享的 Redis 连接,对于每个 LettuceConnection 实际上都是同一个连接;用于执行简单命令,因为 Netty 客户端与 Redis 的单处理线程特性,共享同一个连接也是很快的。如果没开启连接共享,则这个字段为空,使用 asyncDedicatedConn 执行命令。
  • asyncDedicatedConn:私有连接,如果需要保持会话,执行事务,以及 Pipeline 命令,固定连接,则必须使用这个 asyncDedicatedConn 执行 Redis 命令。

execute(RedisCallback),流程是:

image

对于 executePipelined(RedisCallback),如果使用正确的话,会使用 asyncDedicatedConn 私有连接执行。那么怎么算使用正确呢?

需要使用回调的连接进行 Redis 调用,不能直接使用 redisTemplate 调用,否则 pipeline 不生效

Pipeline 生效

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        connection.get("test".getBytes());
        connection.get("test2".getBytes());
        return null;
    }
});

Pipeline 不生效

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        redisTemplate.opsForValue().get("test");
        redisTemplate.opsForValue().get("test2");
        return null;
    }
});

这样我们就能使用保证 API 层正确使用 pipeline 了,但是默认配置的情况下, 底层还是没有执行 Pipeline,这是怎么回事呢?

Redis Pipeline 类比 Lettuce 中的 AutoFlushCommands

Redis Pipeline 是 Redis 中的 批量操作,它能将一组 Redis 命令进行组装,通过一次传输给 Redis 并返回结果集,大大减少了如果命令时一条条单独传输需要的 RTT 时间(包括 Redis 客户端,Redis 服务端切换系统调用发送接收数据的时间,以及网络传输时间)。

如果原来的命令是这么发送的:

Client -> Server: INCR X\r\n
Server -> Client: 1
Client -> Server: INCR X\r\n
Server -> Client: 2
Client -> Server: INCR X\r\n
Server -> Client: 3
Client -> Server: INCR X\r\n
Server -> Client: 4

那么使用 PIPELINE 之后,命令就是类似于这么发送的

Client -> Server: INCR X\r\nINCR X\r\nINCR X\r\nINCR X\r\n
Server -> Client: 1\r\n2\r\n3\r\n4

我们可以看出,其实它的原理,就是客户端先将所有命令拼接在一起然后本地缓存起来,之后统一发到服务端,服务端执行所有命令之后,统一响应。

Lettuce 的连接有一个 AutoFlushCommands 配置,就是指在这个连接上执行的命令,如果发送到服务端。默认是 false,即收到一个命令就发到服务端一个。如果配置为 false,则将所有命令缓存起来,手动调用 flushCommands 的时候,将缓存的命令一起发到服务端,这样其实就是实现了 Pipeline。

配置 Spring-data-redis + Lettuce 使用 Pipeline

Spring-data-redis 从 2.3.0 版本开始,对于 Lettuce 也兼容了 Pipeline 配置,参考:

我们可以这样配置:

@Bean
public BeanPostProcessor lettuceConnectionFactoryBeanProcessor() {
    return new BeanPostProcessor() {
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            //在 LettuceConnectionFactory 这个 Bean 初始化之后,设置 PipeliningFlushPolicy 为 flushOnClose
            if (bean instanceof LettuceConnectionFactory) {
                LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) bean;
                lettuceConnectionFactory.setPipeliningFlushPolicy(LettuceConnection.PipeliningFlushPolicy.flushOnClose());
            }
            return bean;
        }
    };
}

我们来看下这个 PipeliningFlushPolicy 的源码就知道这个 flushOnClose 的意义:

public interface PipeliningFlushPolicy {
    //其实就是默认的每个命令都直接发到 Redis Server
    static PipeliningFlushPolicy flushEachCommand() {
        return FlushEachCommand.INSTANCE;
    }
    //在连接关闭的时候,将命令一起发到 Redis
    static PipeliningFlushPolicy flushOnClose() {
        return FlushOnClose.INSTANCE;
    }
    //手动设置在多少条命令之后,统一发到 Redis,但是同样的,连接关闭的时候也会发到 Redis
    static PipeliningFlushPolicy buffered(int bufferSize) {
        return () -> new BufferedFlushing(bufferSize);
    }
}

这三个类也都实现了 PipeliningFlushState 接口:

public interface PipeliningFlushState {
    //对于 executePipelined,刚开始就会调用 connection.openPipeline(); 开启 pipeline,里面会调用这个方法
    void onOpen(StatefulConnection<?, ?> connection);
    //对于 executePipelined 中的每个命令都会调用这个方法
    void onCommand(StatefulConnection<?, ?> connection);
    //在 executePipelined 的最后会调用 connection.closePipeline(),里面会调用这个方法
    void onClose(StatefulConnection<?, ?> connection);
}

默认的每个命令都直接发到 Redis Server 的实现是:其实就是方法里什么都不做。

private enum FlushEachCommand implements PipeliningFlushPolicy, PipeliningFlushState {
    INSTANCE;
    @Override
    public PipeliningFlushState newPipeline() {
        return INSTANCE;
    }
    @Override
    public void onOpen(StatefulConnection<?, ?> connection) {}
    @Override
    public void onCommand(StatefulConnection<?, ?> connection) {}
    @Override
    public void onClose(StatefulConnection<?, ?> connection) {}
}

对于 flushOnClose:

private enum FlushOnClose implements PipeliningFlushPolicy, PipeliningFlushState {
    INSTANCE;
    @Override
    public PipeliningFlushState newPipeline() {
        return INSTANCE;
    }
    @Override
    public void onOpen(StatefulConnection<?, ?> connection) {
        //首先配置连接的 AutoFlushCommands 为 false,这样命令就不会立刻发到 Redis
        connection.setAutoFlushCommands(false);
    }
    @Override
    public void onCommand(StatefulConnection<?, ?> connection) {
        //收到命令时什么都不做
    }
    @Override
    public void onClose(StatefulConnection<?, ?> connection) {
        //在 pipeline 关闭的时候发送所有命令
        connection.flushCommands();
        //恢复默认配置,这样连接如果退回连接池不会影响后续使用
        connection.setAutoFlushCommands(true);
    }
}

对于 buffered:

private static class BufferedFlushing implements PipeliningFlushState {
    private final AtomicLong commands = new AtomicLong();
    private final int flushAfter;

    public BufferedFlushing(int flushAfter) {
        this.flushAfter = flushAfter;
    }

    @Override
    public void onOpen(StatefulConnection<?, ?> connection) {
        //首先配置连接的 AutoFlushCommands 为 false,这样命令就不会立刻发到 Redis
        connection.setAutoFlushCommands(false);
    }

    @Override
    public void onCommand(StatefulConnection<?, ?> connection) {
        //如果命令达到指定个数,就发到 Redis
        if (commands.incrementAndGet() % flushAfter == 0) {
            connection.flushCommands();
        }
    }

    @Override
    public void onClose(StatefulConnection<?, ?> connection) {
        //在 pipeline 关闭的时候发送所有命令
        connection.flushCommands();
        //恢复默认配置,这样连接如果退回连接池不会影响后续使用
        connection.setAutoFlushCommands(true);
    }
}

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK