3

Java高性能缓存库Caffeine

 1 year ago
source link: https://jasonkayzk.github.io/2023/03/28/Java%E9%AB%98%E6%80%A7%E8%83%BD%E7%BC%93%E5%AD%98%E5%BA%93Caffeine/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Caffeine是基于Java的一个高性能本地缓存库,由Guava改进而来;

本文介绍了如何在Java中使用Caffeine缓存,以及如何在SpringBoot中集成Caffeine缓存;

Java高性能缓存库Caffeine

Caffeine简介

Caffeine是一个Java高性能的本地缓存库。其官方说明指出,其缓存命中率已经接近最优值。

实际上,Caffeine这样的本地缓存和ConcurrentMap很像:支持并发,并且支持O(1)时间复杂度的数据存取。二者的主要区别在于:

  • ConcurrentMap将存储所有存入的数据,直到你显式将其移除;
  • Caffeine将通过给定的配置,自动移除“不常用”的数据,以保持内存的合理占用。

因此,一种更好的理解方式是:

Cache是一种带有存储和移除策略的Map

Caffeine提供如下的一些功能:

- automatic loading of entries into the cache, optionally asynchronously
# 自动加载条目到缓存中,支持异步加载
- size-based eviction when a maximum is exceeded based on frequency and recency
# 根据频率和最近访问情况,支持将缓存数量设为移除策略
- time-based expiration of entries, measured since last access or last write
# 根据最近访问和修改时间,支持将时间设为移除策略
- asynchronously refresh when the first stale request for an entry occurs
# 过期条目再次访问时异步加载
- keys automatically wrapped in weak references
# key自动包装为弱引用
- values automatically wrapped in weak or soft references
# value自动包装为弱引用/软引用
- notification of evicted (or otherwise removed) entries
# 条目移除通知
- writes propagated to an external resource
# 对外部资源的写入
- accumulation of cache access statistics
# 累计缓存使用统计

Caffeine基本使用

在项目中添加依赖:

<dependency>
  <groupId>com.github.ben-manes.caffeine</groupId>
  <artifactId>caffeine</artifactId>
  <version>2.9.3</version>
</dependency>

本文基于 2.9.3 版本;

缓存类型

Caffeine提供了四种类型的Cache,对应着四种加载策略:

  • Cache;
  • LoadingCache;
  • AsyncCache;
  • AsyncLoadingCache;

下面分别来看;

Cache

最普通的一种缓存,无需指定加载方式,需要手动调用put()进行加载;

需要注意的是:put()方法对于已存在的key将进行覆盖,这点和Map的表现是一致的;

在获取缓存值时,如果想要在缓存值不存在时,原子地将值写入缓存,则可以调用get(key, k -> value)方法,该方法将避免写入竞争;

调用invalidate()方法,将手动移除缓存;

多线程情况下,当使用get(key, k -> value)时,如果有另一个线程同时调用本方法进行竞争,则后一线程会被阻塞,直到前一线程更新缓存完成;

而若另一线程调用getIfPresent()方法,则会立即返回null,不会被阻塞;

cache/caffeine/basic/src/main/java/io/github/jasonkayzk/type/CacheDemo.java

public class CacheDemo {

    public static void main(String[] args) {
        Cache<String, String> cache = Caffeine.newBuilder().build();

        System.out.println(cache.getIfPresent("123")); // null
        System.out.println(cache.get("123", k -> "456")); // 456
        System.out.println(cache.getIfPresent("123"));    // 456
        cache.put("123", "789");
        System.out.println(cache.getIfPresent("123"));    // 789
    }

}

LoadingCache

LoadingCache是一种自动加载的缓存;

和普通缓存不同的地方在于:当缓存不存在/缓存已过期时,若调用get()方法,则会自动调用CacheLoader.load()方法加载最新值;

调用getAll()方法将遍历所有的key调用get(),除非实现了CacheLoader.loadAll()方法。

使用LoadingCache时,需要指定CacheLoader,并实现其中的load()方法供缓存缺失时自动加载。

多线程情况下,当两个线程同时调用get(),则后一线程将被阻塞,直至前一线程更新缓存完成。

cache/caffeine/basic/src/main/java/io/github/jasonkayzk/type/LoadingCacheDemo.java

public class LoadingCacheDemo {

    public static void main(String[] args) {
        LoadingCache<String, String> cache = Caffeine.newBuilder()
                .build(new CacheLoader<String, String>() {
                    @Override
                    // 该方法必须实现
                    public String load(@NonNull String k) throws Exception {
                        return "456";
                    }

                    @Override
                    // 如果需要批量加载
                    public @NonNull Map<String, String> loadAll(@NonNull Iterable<? extends String> keys) throws Exception {
                        return new HashMap<String, String>() {
                        };
                    }
                });

        System.out.println(cache.getIfPresent("123")); // null
        System.out.println(cache.get("123"));          // 456
        System.out.println(cache.getAll(Arrays.asList("123", "456")));        // Map<String, String>
    }
}

AsyncCache

AsyncCache是Cache的一个变体,其响应结果均为CompletableFuture,通过这种方式,AsyncCache对异步编程模式进行了适配;

默认情况下,缓存计算使用ForkJoinPool.commonPool()作为线程池,如果想要指定线程池,则可以覆盖并实现Caffeine.executor(Executor)方法。

synchronous()提供了阻塞直到异步缓存生成完毕的能力,它将以Cache进行返回。

多线程情况下,当两个线程同时调用get(key, k -> value),则会返回同一个CompletableFuture对象。由于返回结果本身不进行阻塞,可以根据业务设计自行选择阻塞等待或者非阻塞。

cache/caffeine/basic/src/main/java/io/github/jasonkayzk/type/AsyncCacheDemo.java

public class AsyncCacheDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String key = "123";
        AsyncCache<String, String> cache = Caffeine.newBuilder().buildAsync();

        CompletableFuture<String> completableFuture = cache.get(key, k -> "456");
        System.out.println(completableFuture.get()); // 阻塞,直至缓存更新完成
    }
}

AsyncLoadingCache

显然这是Loading Cache和Async Cache的功能组合。AsyncLoadingCache支持以异步的方式,对缓存进行自动加载。

类似LoadingCache,同样需要指定CacheLoader,并实现其中的load()方法供缓存缺失是自动加载,该方法将自动在ForkJoinPool.commonPool()线程池中提交。如果想要指定Executor,则可以实现AsyncCacheLoader().asyncLoad()方法。

cache/caffeine/basic/src/main/java/io/github/jasonkayzk/type/AsyncLoadingCacheDemo.java

public class AsyncLoadingCacheDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String key = "123";

        AsyncLoadingCache<String, String> cache = Caffeine.newBuilder()
                .buildAsync(new AsyncCacheLoader<String, String>() {
                    @Override
                    // 自定义线程池加载
                    public @NonNull CompletableFuture<String> asyncLoad(@NonNull String key, @NonNull Executor executor) {
                        return CompletableFuture.completedFuture("456");
                    }
                });
//                .buildAsync(new CacheLoader<String, String>() {
//                    @Override
//                    // OR,使用默认线程池加载(二者选其一)
//                    public String load(@NonNull String key) throws Exception {
//                        return "456";
//                    }
//                });

        CompletableFuture<String> completableFuture = cache.get(key); // CompletableFuture<String>
        System.out.println(completableFuture.get());; // 阻塞,直至缓存更新完成
    }
}

驱逐策略

驱逐策略在创建缓存的时候进行指定;

常用的有:基于容量的驱逐和基于时间的驱逐;

  • 基于容量的驱逐:需要指定缓存容量的最大值;当缓存容量达到最大时,Caffeine将使用LRU策略对缓存进行淘汰;
  • 基于时间的驱逐:可以设置在最后访问/写入一个缓存经过指定时间后,自动进行淘汰;

驱逐策略可以组合使用,任意驱逐策略生效后,该缓存条目即被驱逐;

cache/caffeine/basic/src/main/java/io/github/jasonkayzk/evict/EvictDemo.java

public class EvictDemo {

    public static void main(String[] args) {
        // 创建一个最大容量为10的缓存
        Cache<String, String> cache1 = Caffeine.newBuilder().
                maximumSize(10).build();

        // 创建一个写入5s后过期的缓存
        Cache<String, String> cache2 = Caffeine.newBuilder().
                expireAfterWrite(5, TimeUnit.SECONDS).build();

        // 创建一个访问1s后过期的缓存
        Cache<String, String> cache3 = Caffeine.newBuilder().
                expireAfterAccess(1, TimeUnit.SECONDS).build();
    }
}

刷新机制

试想这样一种情况:当缓存运行过程中,有些缓存值我们需要定期进行刷新,以确保信息可以正确被同步到缓存中来;

我们当然可以使用基于时间的驱逐策略expireAfterWrite(),但带来的问题是:一旦缓存过期,下次重新加载缓存时将使得调用线程处于阻塞状态;

而使用刷新机制refreshAfterWrite(),Caffeine将在key允许刷新后的首次访问时,立即返回旧值,同时异步地对缓存值进行刷新,这使得调用方不至于因为缓存驱逐而被阻塞;

需要注意的是:刷新机制只支持LoadingCache和AsyncLoadingCache;

通过覆写CacheLoader.reload()方法,将在刷新时使得旧缓存值参与其中;

cache/caffeine/basic/src/main/java/io/github/jasonkayzk/refresh/RefreshDemo.java

public class RefreshDemo {

    public static void main(String[] args) {
        LoadingCache<String, String> cache1 = Caffeine.newBuilder().
                refreshAfterWrite(10, TimeUnit.MINUTES).
                build(RefreshDemo::create);
    }

    private static String create(String k) {
        return k;
    }
}

统计

Caffeine内置了数据收集功能,通过Caffeine.recordStats()方法,可以打开数据收集;

这样Cache.stats()方法将会返回当前缓存的一些统计指标,例如:

  • hitRate:查询缓存的命中率
  • evictionCount:被驱逐的缓存数量
  • averageLoadPenalty:新值被载入的平均耗时

cache/caffeine/basic/src/main/java/io/github/jasonkayzk/record/RecordDemo.java

public class RecordDemo {

    public static void main(String[] args) {
        // 获取统计指标
        Cache<String, String> cache = Caffeine.newBuilder().
                recordStats().build();
        System.out.println(cache.stats());
        System.out.println(cache.estimatedSize());
    }
}
CacheStats{hitCount=0, missCount=0, loadSuccessCount=0, loadFailureCount=0, totalLoadTime=0, evictionCount=0, evictionWeight=0}
0

SpringBoot中集成Caffeine

SpringBoot缓存管理器

Spring从3.1开始就引入了对Cache的支持。定义了org.springframework.cache.Cacheorg.springframework.cache.CacheManager接口,来统一不同的缓存技术,并支持使用JCache(JSR-107)注解来简化开发。

  • Cache接口包括了缓存的各种操作集合,实际操作缓存时,即通过这些接口进行操作。
  • Cache接口下Spring提供了各种xxxCache的实现。由于官方从SpringBoot 2.x后,将Caffeine代替Guava作为默认的缓存组件,因此这里我们需要用到的就是CaffeineCache这个类。如果需要自定义Cache实现,只需要实现Cache接口即可。
  • CacheManager 定义了创建、配置、获取、管理和控制多个唯一命名的 Cache。这些 Cache 存在于 CacheManager 的上下文中。

创建一个缓存管理器:

@Bean
public CacheManager cacheManager() {
    SimpleCacheManager cacheManager = new SimpleCacheManager();
    ArrayList<CaffeineCache> caches = new ArrayList<>();
    // String cacheName(): 创建缓存名称
    // Cache<Object, Object> generateCache(): 创建一个Caffeine缓存
    caches.add(new CaffeineCache(cacheName(), generateCache()));
    cacheManager.setCaches(caches);
    return cacheManager;
}

使用这种方式,可以同时在缓存管理器中添加多个缓存。需要注意的是,SimpleCacheManager只能使用Cache和LoadingCache,异步缓存将无法支持

使用@Cacheable相关注解

@Cacheable相关注解

添加完成缓存管理器后,我们可以方便地使用@Cacheable相关注解对缓存进行管理了。为了使用该注解,需要引入如下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-cache</artifactId>
</dependency>

和@Cacheable相关的常用的注解包括:

  • @Cacheable:表示该方法支持缓存。当调用被注解的方法时,如果对应的键已经存在缓存,则不再执行方法体,而从缓存中直接返回。当方法返回null时,将不进行缓存操作。
  • @CachePut:表示执行该方法后,其值将作为最新结果更新到缓存中。每次都会执行该方法
  • @CacheEvict:表示执行该方法后,将触发缓存清除操作。
  • @Caching:用于组合前三个注解,例如:
@Caching(cacheable = @Cacheable("users"),
         evict = {@CacheEvict("cache2"), @CacheEvict(value = "cache3", allEntries = true)})
public User find(Integer id) {
    return null;
}

这类注解也同时可以标记在一个类上,表示该类的所有方法都支持对应的缓存注解。

常用注解属性

@Cacheable常用的注解属性如下:

  • cacheNames/value:缓存组件的名字,即cacheManager中缓存的名称。
  • key:缓存数据时使用的key。默认使用方法参数值,也可以使用SpEL表达式进行编写。
  • keyGenerator:和key二选一使用。
  • cacheManager:指定使用的缓存管理器。
  • condition:在方法执行开始前检查,在符合condition的情况下,进行缓存
  • unless:在方法执行完成后检查,在符合unless的情况下,不进行缓存
  • sync:是否使用同步模式。若使用同步模式,在多个线程同时对一个key进行load时,其他线程将被阻塞。

下面是一个注解使用示例:

@Cacheable(value = "UnitCache",
        key = "#unitType + T(top.kotoumi.constants.Constants).SPLIT_STR + #unitId",
        condition = "#unitType != 'weapon'")
public Unit getUnit(String unitType, String unitId) {
    return getUnit(unitType, unitId);
}

该方法使用的缓存为UnitCache,并且手动指定缓存的key是#unitType + Constants.SPLIT_STR + #unitId的拼接结果。该缓存将在#unitType != 'weapon'时生效。

缓存同步模式

@Cacheable注解支持配置同步模式。在不同的Caffeine配置下,对是否开启同步模式进行观察。

Caffeine缓存类型 是否开启同步 多线程读取不存在/已驱逐的key 多线程读取待刷新的key
Cache 各自独立执行被注解方法 -
Cache 线程1执行被注解方法,线程2被阻塞,直至缓存更新完成 -
LoadingCache 线程1执行load(),线程2被阻塞,直至缓存更新完成 线程1使用老值立即返回,并异步更新缓存值;线程2立即返回,不进行更新。
LoadingCache 线程1执行被注解方法,线程2被阻塞,直至缓存更新完成 线程1使用老值立即返回,并异步更新缓存值;线程2立即返回,不进行更新。

从上面的总结可以看到,sync开启或关闭,在Cache和LoadingCache中的表现是不一致的:

  • Cache中,sync表示是否需要所有线程同步等待
  • LoadingCache中,sync表示在读取不存在/已驱逐的key时,是否执行被注解方法

事实上,Cache AOP的读取流程中并没有进行加锁处理,这个参数的实际表现形式是由缓存实现方决定的。在使用Caffeine Cache时,可以根据上表,快速找到合适的组合方式。

Caffeine集成实战

引入相关依赖:

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-cache</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
</dependencies>

这一部分主要是通过使用 Caffeine 对 User 进行缓存;

User结构

User POJO 结构如下:

cache/caffeine/spring-boot/src/main/java/io/github/jasonkayzk/entity/User.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {

    private String id;

    private String userType;

    private String password;

}

缓存组件

对于一个缓存,我们主要关心如下几个方面:

  • 缓存名称:在@Cacheable注解中使用;
  • 最大容量/缓存过期时间:驱逐策略使用;
  • 更新时间:更新策略使用;
  • 更新方法:用于CacheLoader.load()的实现;

因此可以定义一个基本配置类,使得我们实际使用的缓存配置都继承于它:

cache/caffeine/spring-boot/src/main/java/io/github/jasonkayzk/cache/BaseCaffeineCacheConfig.java

@Data
public abstract class BaseCaffeineCacheConfig {

    /**
     * 缓存名称
     */
    private String name = "caffeine";
    /**
     * 默认最大容量,大于0生效
     */
    private int maxSize = 100;
    /**
     * 缓存过期时间(秒),大于0生效
     */
    private int expireDuration = -1;
    /**
     * 缓存刷新时间(秒),大于0生效,且表示这是一个LoadingCache,否则表示是一个普通Cache
     */
    private int refreshDuration = -1;

    private Cache<Object, Object> cache;

    /**
     * 获取特定缓存值
     * @param key key
     * @return 缓存值
     */
    public abstract Object getValue(Object key);

}

实际缓存类:

cache/caffeine/spring-boot/src/main/java/io/github/jasonkayzk/cache/UserCache.java

@Service
@Getter
@EqualsAndHashCode(callSuper = true)
public class UserCache extends BaseCaffeineCacheConfig {

    /**
     * 缓存名称
     */
    private final String name = "UserCache";

    /**
     * 默认最大容量,大于0生效
     */
    private final int maxSize = 100;

    /**
     * 缓存过期时间(秒),大于0生效
     */
    private final int expireDuration = 86400;

    /**
     * 缓存刷新时间(秒),大于0生效,且表示这是一个LoadingCache,否则表示是一个普通Cache
     */
    private final int refreshDuration = 600;

    /**
     * 获取特定缓存值
     *
     * @param key key
     * @return 缓存值
     */
    public Object getValue(Object key) {
        String[] param = ((String) key).split(Constants.SPLIT_STR);
        return getUser(param[0], param[1]);
    }

    @Cacheable(value = "UserCache",
            key = "#userType + T(io.github.jasonkayzk.consts.Constants).SPLIT_STR + #userId",
            condition = "#userType != 'root'")
    public User getUser(String userType, String userId) {
        if ("1".equals(userId) && "admin".equals(userType)) {
            return new User("1", "admin", "admin-password");
        } else {
            return new User("999", "visitor", "no-password");
        }
    }

    @CacheEvict(value = "UserCache",
            key = "#userType + T(io.github.jasonkayzk.consts.Constants).SPLIT_STR + #userId",
            condition = "#userType != 'root'")
    public void deleteUser(String userType, String userId) {
    }
}

这里我们该类注解为@Service,方便后续调用方进行执行;

另外可以看到:@Cacheable注解在了实际业务使用的方法上,其缓存名正好是name自定指定的名称。另外,我们还需要使得getValue()方法调用被注解的业务方法,这一点是为了保证load()方法的正确执行;

在 SpringBoot 中注册:

cache/caffeine/spring-boot/src/main/java/io/github/jasonkayzk/cache/CaffeineCacheConfig.java

@Slf4j
@Configuration
@EnableCaching
public class CaffeineCacheConfig {

    Logger logger = LoggerFactory.getLogger(CaffeineCacheConfig.class);

    @Resource
    private UserCache userCache;

    @Bean
    public CacheManager cacheManager() {
        // 添加所有创建的缓存,这里仅添加一个用于示例
        List<BaseCaffeineCacheConfig> cacheConfigs = new ArrayList<>();
        cacheConfigs.add(userCache);

        // 创建缓存管理器下的所有缓存
        SimpleCacheManager cacheManager = new SimpleCacheManager();
        ArrayList<CaffeineCache> caches = new ArrayList<>();
        for (BaseCaffeineCacheConfig config : cacheConfigs) {
            caches.add(new CaffeineCache(config.getName(), generateCache(config)));
            logger.info("registered cache: {}", config.getName());
        }
        cacheManager.setCaches(caches);
        return cacheManager;
    }

    /**
     * 生成cache,并开启统计功能
     *
     * @param config 配置信息
     * @return cache
     */
    private Cache<Object, Object> generateCache(BaseCaffeineCacheConfig config) {
        // 创建缓存
        Cache<Object, Object> cache;
        Caffeine<Object, Object> builder = Caffeine.newBuilder().recordStats();
        if (config.getMaxSize() > 0) {
            builder.maximumSize(config.getMaxSize());
        }
        if (config.getExpireDuration() > 0) {
            builder.expireAfterWrite(config.getExpireDuration(), TimeUnit.SECONDS);
        }
        if (config.getRefreshDuration() > 0) {
            // 创建LoadingCache,需要传入CacheLoader
            builder.refreshAfterWrite(config.getRefreshDuration(), TimeUnit.SECONDS);
            cache = builder.build(cacheLoader(config));
        } else {
            // 创建普通Cache
            cache = builder.build();
        }
        config.setCache(cache);
        return cache;
    }

    /**
     * 构造cache loader
     *
     * @param config 配置
     * @return cache loader
     */
    private CacheLoader<Object, Object> cacheLoader(BaseCaffeineCacheConfig config) {
        // 使用配置类中的getValue()方法
        return config::getValue;
    }
}

缓存管理器中,使用@Resource注解获得缓存配置类的实例,并将使用该配置生成对应的Cache交由SpringBoot管理;

需要注意:由于只有LoadingCache才可以使用refreshAfterWrite()方法,因此需要根据传入参数确定生成LoadingCache还是普通Cache;

另外需要注意的是,如果需要开启缓存功能,需要在SpringBoot启动类或者任意配置类上使用@EnableCaching注解;

在控制器中使用缓存

cache/caffeine/spring-boot/src/main/java/io/github/jasonkayzk/controller/UserController.java

@Slf4j
@RestController
@RequestMapping("/user")
public class UserController {

    @Resource
    private UserCache userCache;

    Logger logger = LoggerFactory.getLogger(UserController.class);

    @GetMapping("/{userType}/{userId}")
    public User getUser(@PathVariable String userType,
                        @PathVariable String userId) {
        logger.info("getUser: userType: {}, userId: {}", userType, userId);

        userCache.getCache().asMap().forEach((key, user) -> logger.info(user.toString()));
        return userCache.getUser(userType, userId);
    }

    @DeleteMapping("/{userType}/{userId}")
    public void deleteUser(@PathVariable String userType,
                           @PathVariable String userId) {
        userCache.deleteUser(userType, userId);
    }

    @GetMapping("/cachestat")
    public String cacheStat() {
        // 获取Cache实例,并调用stat()方法查看缓存情况
        return userCache.getCache().stats().toString();
    }
}

注意:@Cacheable 注解是通过Spring AOP机制进行的,因此类内的调用将无法触发缓存操作,必须由外部进行调用;

同时注意到,在缓存配置类中增加了一个成员变量,并在generateCache()方法中将缓存对象传递过去:

public abstract class BaseCaffeineCacheConfig {
    private Cache<Object, Object> cache;
}

private Cache<Object, Object> generateCache(BaseCaffeineCacheConfig config) {
  config.setCache(cache);
  return cache;
}

就可以调用相关统计方法了;

测试

启动服务,首先访问:http://127.0.0.1:8848/user/root/1

由于在上面的配置中:

@Cacheable(value = "UserCache",
            key = "#userType + T(io.github.jasonkayzk.consts.Constants).SPLIT_STR + #userId",
            condition = "#userType != 'root'")

对于 userType==root 的不会缓存,因此返回默认的数据:

{"id":"999","userType":"visitor","password":"no-password"}

并且访问:http://127.0.0.1:8848/user/cachestat

显示此时没有缓存:

CacheStats{hitCount=0, missCount=0, loadSuccessCount=0, loadFailureCount=0, totalLoadTime=0, evictionCount=0, evictionWeight=0}

随后,访问:http://127.0.0.1:8848/user/admin/1,返回:

{"id":"1","userType":"admin","password":"admin-password"}

再访问:http://127.0.0.1:8848/user/visitor/1,返回:

{"id":"999","userType":"visitor","password":"no-password"}

再次访问:http://127.0.0.1:8848/user/cachestat,查看缓存状态:

CacheStats{hitCount=0, missCount=2, loadSuccessCount=2, loadFailureCount=0, totalLoadTime=395541, evictionCount=0, evictionWeight=0}

附录

参考文章:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK