2

Java集合 - ConcurrentHashMap - 真正的飞鱼

 1 year ago
source link: https://www.cnblogs.com/feiyu2/p/ConcurrentHashMap.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

介绍 ConcurrentHashMap

技术是为了解决问题而生的,ConcurrentHashMap 解决了多个线程同时操作一个 HashMap 时,可能出现的内部问题。当多个线程同时操作一个 HashMap 时,有可能会出现多线程同时修改一个共享变量(HashMap 类的成员变量),导致数据被覆盖,产生意想不到的错误。


ConcurrentHashMap 内部使用了锁和各种数据结构来保证访问 Map 是线程安全的。

ConcurrentHashMap 和 HashMap 底层的数组、链表结构几乎相同,底层对数据结构的操作思路是相同的。ConcurrentHashMap 除了数组 + 链表 + 红黑树的基本结构外,新增了转移节点结构(ForwardingNode)。


介绍转移节点(ForwardingNode)

转移节点是 ForwardingNode 结构, ForwardingNode 继承了 Node。ForwardingNode 节点的 hash 值固定为 -1。ForwardingNode 比 Node 多了一个 nextTable 成员变量,nextTable 成员变量的类型是 Node 数组。nextTable 成员变量是扩容之后的新数组。

如果数组在索引 i 上的结构是 ForwardingNode,那么表示这个哈希桶内的全部节点都已经转移到扩容之后的新数组,旧的哈希桶内的数据不能发生改变。转移节点(ForwardingNode)是为了保证 ConcurrentHashMap 扩容时的线程安全。保证了当一个哈希桶内的全部节点都已经转移到扩容之后的新数组后、扩容操作完成之前,旧的哈希桶内的数据不发生变化。

当一个哈希桶内的全部节点都已经转移到扩容之后的新数组后、扩容操作完成之前,如果有其他的线程执行 put 操作,需要将新增的节点 put 到旧的哈希桶内,那么这个线程会调用 helpTransfer() 方法帮助扩容。扩容完成之后,这个线程再将要新增的节点 put 到新的哈希桶内。

ConcurrentHashMap 的新增操作

ConcurrentHashMap 在 put 方法上对数据结构的操作思路和 HashMap 相同,但 ConcurrentHashMap 的 put() 方法写了很多保障线程安全的代码。当调用 ConcurrentHashMap 的 put() 方法时,put() 方法的处理逻辑如下:

  • 首先,如果 CHM 的成员变量 table 数组为空(null 或者 length 为 0),则调用 initTable() 方法初始化 table 数组。由于 initTable() 方法操作了共享变量,因此 initTable() 方法采用了一些手段来保证线程安全。
  • 接下来,它会调用 spread() 方法根据 key 计算出 hash 值,然后根据计算出的 hash 值计算出 key 对应的数组索引 i
  • 计算出 key 对应的数组索引 i 之后,它调用 tabAt() 方法,tabAt() 方法返回数组在索引 i 上的值。然后它根据数组在索引 i 上的值进行处理。由于 tabAt() 方法读取了共享变量 table 数组在索引 i 上的值,因此 tabAt() 方法调用 Unsafe 类的 get 方法保证数据的可见性:
    • 如果数组在索引 i 上的值为 null,则直接生成一个新的节点,并调用 casTabAt() 方法让 tab[i] 指向该节点。由于 casTabAt() 方法操作了共享变量 tab 数组,因此 casTabAt() 方法调用 Unsafe 类的 compareAndSwap 方法保证数据不被覆盖;
    • 如果数组在索引 i 上的值不为 null,则意味着需要解决 hash 冲突问题、扩容冲突问题。
  • 接上一步骤,如果数组在索引 i 上的值不为 null。
    • 如果索引 i 上的结构是转移节点(ForwardingNode 结构,节点的 hash 值为 -1,表明这个哈希桶内的全部节点都已经转移到扩容之后的新数组,旧的哈希桶内的数据不能发生改变),它就会调用 helpTransfer() 方法,helpTransfer() 方法会帮助扩容。扩容完成之后,它再将要新增的节点 put 到扩容之后的新数组中。
    • 如果索引 i 上的结构不是转移节点,那么它会使用 synchronized 关键字给索引 i 上的结构加锁,保证同时最多只有一个线程操作索引 i 上的结构。给索引 i 上的结构加锁之后,它会判断数组在索引 i 上的结构是链表 还是 红黑树,然后调用相应的新增代码。
      • 如果索引 i 上的结构是链表,则把新生成的节点加到链表的末尾;
      • 如果索引 i 上的结构是红黑树,那么使用红黑树方式新增。
  • 接上一步骤,如果索引 i 上的结构是普通链表,则把新生成的节点加到链表的末尾之后,需要判断是否需要将链表转为红黑树:
    • 如果链表的长度大于等于 8,并且数组的长度大于等于 64,则调用 treeifyBin() 将链表转为红黑树;
    • 如果链表的长度大于等于 8,但是数组的长度小于 64,则调用 tryPresize() 方法执行扩容操作;
    • 当红黑树中的节点个数小于等于 6 时,红黑树会转为链表。
  • 将节点加入 CHM 集合之后,put() 方法的最后一步是调用 addCount() 方法增加 ConcurrentHashMap 中元素个数的计数值。addCount() 方法的任务是:增加 ConcurrentHashMap 中元素的计数值。如果元素的数量超过了 ConcurrentHashMap 扩容的阈值(sizeCtl),那么就会调用 transfer() 方法执行扩容操作。如果此时有其他的线程已经在执行扩容操作,那么当前线程就协助扩容。

当调用 CHM 的 put() 方法时,如果 CHM 中已经存在要新增的 key,并且方法的入参 onlyIfAbsent 为 false,则替换旧值,并返回旧值。

ConcurrentHashMap 的扩容机制

ConcurrentHashMap 的扩容时机和 HashMap 相同,都是在 put() 方法的最后一步检查是否需要扩容。ConcurrentHashMap 扩容的方法叫做 transfer(),从 put() 方法的 addCount() 方法进去,就能找到 transfer() 方法。

如果 ConcurrentHashMap 中元素的数量超过了扩容的阈值(sizeCtl),那么它会调用 transfer() 方法执行扩容操作。ConcurrentHashMap 的扩容机制是扩容为原来容量的 2 倍。ConcurrentHashMap 扩容的处理逻辑和 HashMap 完全不同。


ConcurrentHashMap 扩容的大体思路如下:扩容需要把旧数组上的全部节点转移到扩容之后的新数组上,节点的转移是从数组的最后一个索引位置开始,一个索引一个索引进行的。每个线程一轮处理有限个数的哈希桶。当旧数组上的全部节点转移到扩容之后的新数组后,ConcurrentHashMap 的 table 成员变量指向扩容之后的新数组,扩容操作完成。transfer() 方法的处理逻辑如下:

  • 首先根据 CPU 核数和 table 数组的长度,计算当前线程一轮处理哈希桶的个数。ConcurrentHashMap 的 transferIndex 成员变量会记录下一轮 或者是 下一个线程要处理的哈希桶的索引值 + 1

    • 如果 CPU 核数为 1,那么当前线程一轮处理哈希桶的个数为 table 数组的长度;
    • 如果 CPU 核数大于 1,那么先计算 num = (tab.length >>> 3) / NCPU的值:
      • 如果 num 值大于等于 16,那么 num 值就是当前线程一轮处理哈希桶的个数;
      • 如果 num 值小于 16,那么当前线程一轮处理哈希桶的个数为 16。也就是说,线程一轮处理哈希桶的个数最小值为 16。
  • 领取完任务之后线程就开始处理哈希桶内的节点。节点的转移是从数组的最后一个索引位置开始,一个索引一个索引进行的。在转移索引 i 上的节点之前,它会使用 synchronized 关键字给索引 i 上的结构加锁,保证同时最多只有一个线程操作索引 i 上的结构。

  • 给索引 i 上的结构加锁之后,它会判断数组在索引 i 上的结构是链表 还是 红黑树,然后调用相应的节点转移代码。

    • 如果索引 i 上的结构是链表,它通过将节点 key 的 hash 值 和 数组的长度 n 做与运算获得 n 对应的二进制表示中的 1 这一位在 hash 值中是 0 还是 1,即 b = p.hash & n。获取到 b 之后,用 b 来判断要转移的节点是要挂到低位哈希桶里,还是挂到高位哈希桶里。遍历完链表,形成两个链表(低位链表、高位链表)之后,将链表的头节点赋值给对应的 tab[i]:

      • 如果 b 的值为 0,则要转移的节点挂到低位哈希桶里
      • 如果 b 的值非 0,则要转移的节点挂到高位哈希桶里
    • 如果索引 i 上的结构是红黑树那么使用红黑树方式转移节点。

  • 在将索引 i 上的全部节点转移到扩容之后的新数组后,它让旧数组 tab[i] 指向转移节点(ForwardingNode)。

  • 当旧数组上的全部节点转移到扩容之后的新数组后,ConcurrentHashMap 的 table 成员变量指向扩容之后的新数组,扩容操作完成。

介绍低位哈希桶、高位哈希桶:如果 ConcurrentHashMap 当前的数组长度为 n 时,一个节点的 key 对应的哈希桶索引为 i。那么 ConcurrentHashMap 扩容之后数组长度为 2n 时,这个节点的 key 对应的低位哈希桶的索引为 i,对应的高位哈希桶的索引为 i + n。


ConcurrentHashMap 支持多线程扩容:

  • 如果在扩容的过程中,有其他的线程执行新增操作,新增操作完成后,这个线程会调用 transfer() 方法协助扩容。
  • 如果一个线程在扩容时,有其他的线程执行新增操作,需要把节点 put 到索引为 i 的哈希桶内。其他的线程它发现索引 i 上的结构是转移节点(ForwardingNode 结构, 节点的 hash 值为 -1,表明这个哈希桶内的元素已经扩容迁移完成),那么这个线程它就会调用 helpTransfer() 方法,helpTransfer() 方法会调用 transfer() 帮助扩容。扩容完成之后,它再将要新增的节点 put 到扩容之后的新数组中。

ConcurrentHashMap 的查找操作

当调用 ConcurrentHashMap 的 get() 方法时,get() 方法的处理逻辑如下:

  • 首先,它会根据传入的 key 计算出 hash 值;然后根据计算出的 hash 值计算出 key 对应的数组索引 i
  • 计算出 key 对应的数组索引 i 之后,根据存储位置,从数组中取出对应的 Entry,然后通过 key 对象的 equals() 方法判断传入的 key 和 Entry 中的 key 是否相等:
    • 如果传入的 key 和 Entry 中的 key 相等,则查找操作完成,返回 Entry 中的 value;
    • 如果传入的 key 和 Entry 中的 key 不相等,则再判断数组在索引 i 上的结构是链表 还是 红黑树,然后调用相应的查找数据的方法。直到找到相等的 Entry 或者没有下一个 Entry 为止。

ConcurrentHashMap 的容量大小问题

ConcurrentHashMap 的数组长度总是为 2 的幂次方。不论传入的初始容量是否为 2 的幂次方,最终都会转化为 2 的幂次方。

ConcurrentHashMap 中根据 key 计算出 hash 值,然后根据计算出的 hash 值计算出 key 对应的数组索引 i:

  • 根据 key 计算处 hash 值:在计算 hash 值时,它先将 key 的 hashCode 值无符号右移 16 位,然后再和 key 的 hashCode 值做 异或 运算,即 hash = (hashCode >>> 16) ^ hashCode
  • 根据 hash 值计算出 key 对应的数组索引 i:在计算 key 对应的数组索引 i 时,它将 hash 值 和 数组的长度 - 1 做与运算获得 key 对应的数组索引 i,即 i = hash & (n - 1)

ConcurrentHashMap 的数组长度总是为 2 的幂次方设计的非常巧妙:

  • 在计算 hash 值时,它先将 key 的 hashCode 值无符号右移 16 位,然后再和 key 的 hashCode 值做 异或 运算,即 hash = (hashCode >>> 16) ^ hashCode。使 key 的 hashCode 值高 16 位的变化映射到低 16 位中,使 hashCode 值高 16 位也参与后续索引 i 的计算,减少了碰撞的可能性。
  • 在计算 key 对应的数组索引 i 时,它将 hash 值 和 数组的长度 - 1 做与运算获得 key 对应的数组索引 i,即 i = hash & (n - 1)。由于数组的长度 n 是 2 的幂次方,n - 1 可以保证它的二进制表示中的后几位都是 1,n 对应的二进制位及之前的位都是 0。因此,计算出的数组索引 i 和 hash 值的二进制表示中后几位有关,而与前面的二进制位无关
  • 当 b 是 2 的幂次方时,a % b == a & (b - 1)。CPU 处理位运算比处理数学运算的速度更快,效率更高。
  • 在 ConcurrentHashMap 扩容时,它通过将 key 的 hash 值 和 数组的长度 n 做与运算获得 n 对应的二进制表示中的 1 这一位在 hash 值中是 0 还是 1,即 b = p.hash & n。获取到 b 之后,用 b 来判断要转移的节点是要挂到低位哈希桶里,还是挂到高位哈希桶里:
    • 如果 b 的值为 0,则要转移的节点挂到低位哈希桶里
    • 如果 b 的值非 0,则要转移的节点挂到高位哈希桶里

ConcurrentHashMap 的计数

当调用 ConcurrentHashMap 的 put() 方法时,put() 方法的最后一步是调用 addCount() 方法。

addCount() 方法的任务是:增加 ConcurrentHashMap 中元素的计数值。如果元素的数量超过了 ConcurrentHashMap 扩容的阈值(sizeCtl),那么就会调用 transfer() 方法执行扩容操作。如果此时有其他的线程已经在执行扩容操作,那么当前线程就协助扩容。


ConcurrentHashMap 采用了一些数据结构和手段来支持高效的并发计数。ConcurrentHashMap 使用 long 类型的 baseCount 成员变量和 CounterCell 类型的 counterCells 数组来支持高效的并发计数。

  • baseCount 是基础的计数值。主要通过调用 Unsafe 类的 compareAndSwap 方法更新 baseCount 的值
  • counterCells 数组的使用:如果有多个线程调用 addCount() 方法增加元素的计数值,那么每个线程将要增加的计数值保存在 counterCells 数组中。当调用 ConcurrentHashMap 的 size() 方法获取元素个数时,size() 方法将循环遍历 counterCells 数组,累加计数值得到当时元素。

ConcurrentHashMap 的计数将线程竞争分散到 counterCells 数组的每一个元素,提高了并发计数的性能。

private transient volatile long baseCount;

// 如果 counterCells 数组不为空,则数组的长度为 2 的幂次方。
private transient volatile CounterCell[] counterCells;

@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

__EOF__


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK