0

guava cache过期方案实践

 2 years ago
source link: https://segmentfault.com/a/1190000041072880
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

guava cache过期方案实践

只要是缓存,就必定有过期机制,guava 缓存过期分为以下三种:

  • expireAfterAccess: 数据在指定时间内没有被访问(读或写),则为过期数据,当没有数据或者读到过期数据时,只允许一个线程更新新数据时,其他线程阻塞等待该线程更新完成后,取最新的数据。

构造函数:

public CacheBuilder<K, V> expireAfterAccess(long duration, TimeUnit unit) {
  ...
  this.expireAfterAccessNanos = unit.toNanos(duration);
  return this;
}

构造函数设置了变量expireAfterAccessNanos的值。

  • expireAfterWrite:数据在指定时间内没有被更新(写入),则为过期数据,当没有数据或者读到过期数据时,只允许一个线程更新新数据时,其他线程阻塞等待该线程更新完成后,取最新的数据。

构造函数:

public CacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) {
  ...
  this.expireAfterWriteNanos = unit.toNanos(duration);
  return this;
}

构造函数设置了变量expireAfterWriteNanos的值。

  • refreshAfterWrite:数据在指定时间内没有被更新(写入),则为过期数据,当有线程正在更新(写入)新数据时,其他线程返回旧数据。

构造函数:

public CacheBuilder<K, V> refreshAfterWrite(long duration, TimeUnit unit) {
  ...
  this.refreshNanos = unit.toNanos(duration);
  return this;
}

构造函数设置了变量refreshNanos的值。

  • expireAfterAccess和expireAfterWrite:
    当数据达到过期时间,限制只能有1个线程去执行数据刷新,其他请求阻塞等待这个刷新操作完成,对性能会有的损耗。
  • refreshAfterWrite:
    当数据达到过期时间,限制只能有1个线程去执行新值的加载,其他线程取旧值返回(也可设置异步获取新值,所有线程都返回旧值)。这样有效地可以减少等待和锁争用,所以refreshAfterWrite会比expireAfterWrite性能好。但是还是会有一个线程需要去执行刷新任务,而guava cache支持异步刷新,如果开启异步刷新,在该线程在提交异步刷新任务之后,也会返回旧值,性能上更优异。
    但是由于guava cache并不会定时清理的功能(主动),而是在查询数据时,一并做了过期检查和清理(被动)。那就会出现以下问题:数据如果隔了很长一段时间再去查询,得到的这个旧值可能来自于很长时间之前,这将会引发问题,对时效性要求高的场景可能会造成非常大的错误。

当缓存里没有要访问的数据时,不管设置的是哪个模式,所有的线程都会阻塞,通过锁来控制只会有一个线程去加载数据

首先要了解guava cache过期原理。

1. 整体方法

get方法:

class LocalCache<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V> {

    V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
      checkNotNull(key);
      checkNotNull(loader);
      try {
        if (count != 0) { // read-volatile
          // don't call getLiveEntry, which would ignore loading values
          ReferenceEntry<K, V> e = getEntry(key, hash);
          if (e != null) {
            long now = map.ticker.read();
            V value = getLiveValue(e, now);
            if (value != null) {
              recordRead(e, now);
              statsCounter.recordHits(1);
              return scheduleRefresh(e, key, hash, value, now, loader);
            }
            ValueReference<K, V> valueReference = e.getValueReference();
            if (valueReference.isLoading()) {
              return waitForLoadingValue(e, key, valueReference);
            }
          }
        }

        // at this point e is either null or expired;
        return lockedGetOrLoad(key, hash, loader);
      } catch (ExecutionException ee) {
        Throwable cause = ee.getCause();
        if (cause instanceof Error) {
          throw new ExecutionError((Error) cause);
        } else if (cause instanceof RuntimeException) {
          throw new UncheckedExecutionException(cause);
        }
        throw ee;
      } finally {
        postReadCleanup();
      }
    }
    
}

可以看到guava cache继承了ConcurrentHashMap,为了满足并发场景,核心的数据结构就是按照 ConcurrentHashMap 来的。

2. 简化方法

这里将方法简化为跟本次主题有关的几个关键步骤:

if (count != 0) {    // 当前缓存是否有数据
  ReferenceEntry<K, V> e = getEntry(key, hash);    // 取数据节点
  if (e != null) {                 
    V value = getLiveValue(e, now);    // 判断是否过期,过滤已过期数据,仅对expireAfterAccess或expireAfterWrite模式下设置的时间做判断
    if (value != null) {
      return scheduleRefresh(e, key, hash, value, now, loader);    // 是否需要刷新数据,仅在refreshAfterWrite模式下生
    }
    ValueReference<K, V> valueReference = e.getValueReference();
    if (valueReference.isLoading()) {   // 如果有其他线程正在加载/刷新数据
      return waitForLoadingValue(e, key, valueReference);    // 等待其他线程完成加载/刷新数据
    }        
  }
}
return lockedGetOrLoad(key, hash, loader);    // 加载/刷新数据

countcache的一个属性,被volatile修饰(volatile int count),保存的是当前缓存的数量。

  • 如果count == 0(没有缓存)或者根据key取不到Hash节点,则加锁并加载缓存lockedGetOrLoad)。
  • 如果取到Hash节点,则判断是否过期(getLiveValue),过滤掉已过期数据。

3. getLiveValue

V getLiveValue(ReferenceEntry<K, V> entry, long now) {
  if (entry.getKey() == null) {
    tryDrainReferenceQueues();
    return null;
  }
  V value = entry.getValueReference().get();
  if (value == null) {
    tryDrainReferenceQueues();
    return null;
  }

  if (map.isExpired(entry, now)) {
    tryExpireEntries(now);
    return null;
  }
  return value;
}

通过isExpired判断当前节点是否已过期:

boolean isExpired(ReferenceEntry<K, V> entry, long now) {
  checkNotNull(entry);
  if (expiresAfterAccess() && (now - entry.getAccessTime() >= expireAfterAccessNanos)) {
    return true;
  }
  if (expiresAfterWrite() && (now - entry.getWriteTime() >= expireAfterWriteNanos)) {
    return true;
  }
  return false;
}

isExpired只判断了expireAfterAccessNanos和,expireAfterWriteNanos两个时间,结合expireAfterAccessexpireAfterWriterefreshAfterWrite三个方法的构造函数,可以看到这方法并不去管refreshAfterWrite设置的时间,也就是如果过了expireAfterAccessexpireAfterWrite设置的时间,那数据就是过期了,否则就是没过期。
如果发现数据已过期,则会连带检查是否还有其他过期的数据(惰性删除):

void tryExpireEntries(long now) {
  if (tryLock()) {
    try {
      expireEntries(now);
    } finally {
      unlock();
      // don't call postWriteCleanup as we're in a read
    }
  }
}
  
void expireEntries(long now) {
  drainRecencyQueue();
  ReferenceEntry<K, V> e;
  while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
    if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
      throw new AssertionError();
    }
  }
  while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
    if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
      throw new AssertionError();
    }
  }
}  

void drainRecencyQueue() {
  ReferenceEntry<K, V> e;
  while ((e = recencyQueue.poll()) != null) {
    if (accessQueue.contains(e)) {
      accessQueue.add(e);
    }
  }
}   

取最近范围&写入的数据,一一检查是否过期。

4. scheduleRefresh

V value = getLiveValue(e, now);
if (value != null) {
  return scheduleRefresh(e, key, hash, value, now, loader);
}

getLiveValue之后,如果结果不为null,则说明在expireAfterAccessexpireAfterWrite两个模式下没有过期(或者没有设置这两个模式的时间),但是不代表数据不会刷新,因为getLiveValue并没有判断refreshAfterWrite的过期时间,而是在scheduleRefresh方法里判断。

V scheduleRefresh(
    ReferenceEntry<K, V> entry,
    K key,
    int hash,
    V oldValue,
    long now,
    CacheLoader<? super K, V> loader) {
  if (map.refreshes()
      && (now - entry.getWriteTime() > map.refreshNanos)
      && !entry.getValueReference().isLoading()) {
    V newValue = refresh(key, hash, loader, true);
    if (newValue != null) {
      return newValue;
    }
  }
  return oldValue;
}

满足以下条件时,会刷新数据(刷新线程在同步刷新模式下,返回新值,异步刷新模式下可能会返回旧值),否则直接返回旧值:

  1. 设置了refreshAfterWrite时间refreshNanos
  2. 当前数据已过期。
  3. 没有其他线程正在刷新数据(!entry.getValueReference().isLoading())。

5. waitForLoadingValue

如果没有设置refreshAfterWrite,且数据已过期:

  1. 如果有其他线程正在刷新,则阻塞等待(通过future.get()阻塞)。
  2. 如果没有其他线程正在刷新,则加锁并刷新数据。

    ValueReference<K, V> valueReference = e.getValueReference();
    if (valueReference.isLoading()) {
      return waitForLoadingValue(e, key, valueReference);
    }  
    
    V waitForLoadingValue(ReferenceEntry<K, V> e, K key, ValueReference<K, V> valueReference)
     throws ExecutionException {
      if (!valueReference.isLoading()) {
     throw new AssertionError();
      }
    
      checkState(!Thread.holdsLock(e), "Recursive load of: %s", key);
      // don't consider expiration as we're concurrent with loading
      try {
     V value = valueReference.waitForValue();
     if (value == null) {
       throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");
     }
     // re-read ticker now that loading has completed
     long now = map.ticker.read();
     recordRead(e, now);
     return value;
      } finally {
     statsCounter.recordMisses(1);
      }
    }
    
    public V waitForValue() throws ExecutionException {
      return getUninterruptibly(futureValue);
    }
    
    public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
      boolean interrupted = false;
      try {
     while (true) {
       try {
         return future.get();
       } catch (InterruptedException e) {
         interrupted = true;
       }
     }
      } finally {
     if (interrupted) {
       Thread.currentThread().interrupt();
     }
      }
    }

6. 加载数据

加载数据,最终就是调用不管是lockedGetOrLoad方法,还是scheduleRefresh中的refresh方法,最终调用的是CacheLoaderload/reload方法。
当缓存里没有要访问的数据时,不管设置的是哪个模式,都会进入到lockedGetOrLoad方法中:

  1. 通过锁争抢拥有加载数据的权利
  2. 抢到锁的数据,将节点状态设置为loading,并加载数据。
  3. 没抢到锁的数据,进入跟上一步一样的waitForLoadingValue方法,阻塞等到数据加载完成。
lock();
try {
  LoadingValueReference<K, V> loadingValueReference =
                new LoadingValueReference<K, V>(valueReference);
  e.setValueReference(loadingValueReference);

  if (createNewEntry) {
    loadingValueReference = new LoadingValueReference<K, V>();

    if (e == null) {
      e = newEntry(key, hash, first);
      e.setValueReference(loadingValueReference);
      table.set(index, e);
    } else {
      e.setValueReference(loadingValueReference);
    }
  }
} finally {
  unlock();
  postWriteCleanup();
}

if (createNewEntry) {
  try {
    // Synchronizes on the entry to allow failing fast when a recursive load is
    // detected. This may be circumvented when an entry is copied, but will fail fast most
    // of the time.
    synchronized (e) {
      return loadSync(key, hash, loadingValueReference, loader);
    }
  } finally {
    statsCounter.recordMisses(1);
  }
} else {
  // The entry already exists. Wait for loading.
  return waitForLoadingValue(e, key, valueReference);
}

通过以上分析,可以知道在判断缓存是否过期时,guava cache会分成两次独立的判断:

  1. 判断expireAfterAccessexpireAfterWrite
  2. 判断refreshAfterWrite

回到问题“refreshAfterWrite虽然提升了性能,但是除了同步加载模式下执行刷新的那个线程外,其他线程可能访问到过期已久的数据”。我们可以通过expireAfterWriterefreshAfterWrite结合的方案来解决:设置了refreshAfterWrite的过期时间的同时,可以再设置expireAfterWrite/expireAfterAccess的过期时间,expireAfterWrite/expireAfterAccess的时间要大于refreshAfterWrite的时间。
例如refreshAfterWrite的时间为5分钟,而expireAfterWrite的时间为30分钟,访问到过期数据时:

  1. 如果过期时间小于30分钟,则会进入scheduleRefresh方法,除刷新线程以外的其他线程直接返回旧值。
  2. 如果缓存数据长时间未被访问,过期时间超过30分钟,则数据会在getLiveValue方法中被过滤掉,除刷新线程以外,其他线程阻塞等待。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK