HashMap详解
# HashMap
# 数据结构
数组+链表+(红黑树jdk>=8)
# 源码原理分析
# 重要成员变量
- DEFAULT_INITIAL_CAPACITY = 1 << 4; Hash表默认初始容量
- MAXIMUM_CAPACITY = 1 << 30; 最大Hash表容量
- DEFAULT_LOAD_FACTOR = 0.75f;默认加载因子
- TREEIFY_THRESHOLD = 8;链表转红黑树阈值
- UNTREEIFY_THRESHOLD = 6;红黑树转链表阈值
- MIN_TREEIFY_CAPACITY = 64;链表转红黑树时hash表最小容量阈值,达不到优先扩容。
# 内部的执行机制源码
HashMap是线程不安全的,不安全的具体原因就是在高并发场景下,扩容可能产生死锁(Jdk1.7存在)以及get操作可能带来的数据丢失。
# Jdk7-扩容死锁分析
死锁问题核心在于下面代码,多线程扩容导致形成的链表环!
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;
for (Entry<K,V> e : table) {
while(null != e) {
Entry<K,V> next = e.next;//第一行
if (rehash) {
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);//第二行
e.next = newTable[i];//第三行
newTable[i] = e;//第四行
e = next;//第五行
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
去掉了一些冗余的代码, 层次结构更加清晰了。
- 第一行:记录oldhash表中e.next
- 第二行:rehash计算出数组的位置(hash表中桶的位置)
- 第三行:e要插入链表的头部, 所以要先将e.next指向new hash表中的第一个元素
- 第四行:将e放入到new hash表的头部
- 第五行: 转移e到下一个节点, 继续循环下去
# 单线程扩容
假设:hash算法就是简单的key与length(数组长度)求余。hash表长度为2,如果不扩容, 那么元素key为3,5,7按照计算(key%table.length)的话都应该碰撞到table[1]上。
扩容:hash表长度会扩容为4重新hash,key=3 会落到table[3]上(3%4=3), 当前e.next为key(7), 继续while循环重新hash,key=7 会落到table[3]上(7%4=3), 产生碰撞, 这里采用的是头插入法,所以key=7的Entry会排在key=3前面(这里可以具体看while语句中代码)当前e.next为key(5), 继续while循环重新hash,key=5 会落到table[1]上(5%4=3), 当前e.next为null, 跳出while循环,resize结束。
如下图所示
# 多线程扩容
下面就是多线程同时put的情况了, 然后同时进入transfer方法中:假设这里有两个线程同时执行了put()操作,并进入了transfer()环节
while(null != e) {
Entry<K,V> next = e.next;//第一行,线程1执行到此被调度挂起
int i = indexFor(e.hash, newCapacity);//第二行
e.next = newTable[i];//第三行
newTable[i] = e;//第四行
e = next;//第五行
}
2
3
4
5
6
7
第一步:线程启动,有线程T1和线程T2都准备对HashMap进行扩容操作, 此时T1和T2指向的都是链表的头节点A,而T1和T2的下一个节点分别是T1.next和T2.next,它们都指向B节点。
![image-20220712174846887](http://blogimg.gkmall.top/img/classless/202207121749187.png)
第二步:开始扩容,这时候,假设线程T2的时间片用完,进入了休眠状态,而线程T1开始执行扩容操作,一直到线程T1扩容完成后,线程T2才被唤醒。
![image-20220712174932008](http://blogimg.gkmall.top/img/classless/202207121749996.png)
T1完成扩容之后的场景就变成动画所示的这样
![image-20220712175008035](http://blogimg.gkmall.top/img/classless/202207121750133.png)
因为HashMap扩容采用的是头插法,线程T1执行之后,链表中的节点顺序发生了改变。但线程T2对于发生的一切还是不可知的,所以它指向的节点引用依然没变。如图所示,T2指向的是A节点,T2.next指向的是B节点。
![image-20220712175059676](http://blogimg.gkmall.top/img/classless/202207121751788.png)
当线程T1执行完成之后,线程T2恢复执行时,死循环就发生了。
![image-20220712175204762](http://blogimg.gkmall.top/img/classless/202207121752184.png)
因为T1执行完扩容之后,B节点的下一个节点是A,而T2线程指向的首节点是A,第二个节点是B,这个顺序刚好和T1扩容之前的节点顺序是相反的。T1执行完之后的顺序是B到A,而T2的顺序是A到B,这样A节点和B节点就形成了死循环。
![image-20220712175232700](http://blogimg.gkmall.top/img/classless/202207121752792.png)
# Jdk8-扩容
Java8 HashMap扩容跳过了Jdk7扩容的坑,对源码进行了优化,采用高低位拆分转移方式,避免了链表环的产生。
扩容前:
扩容后:
由于Jdk8引入了新的数据结构,所以put方法过程也有了一定改进,其过程如下图所示。
![Java8 HashMap](http://blogimg.gkmall.top/img/classless/202207011824454.png)
![0](http://blogimg.gkmall.top/img/classless/202207011824866.png)
# ConcurrentHashMap
# 数据结构
ConcurrentHashMap的数据结构与HashMap基本类似,区别在于:
- 内部在数据写入时加了同步机制(分段锁)保证线程安全,读操作是无锁操作;
- 扩容时老数据的转移是并发执行的,这样扩容的效率更高。
# 并发安全控制
Java7 ConcurrentHashMap基于ReentrantLock实现分段锁
Java8中 ConcurrentHashMap基于分段锁+CAS保证线程安全,分段锁基于synchronized关键字实现;
# ConcurrentHashMap源码分析
# 一、 数据结构
数组+链表+红黑树
![image.png](http://blogimg.gkmall.top/img/classless/202206281440749.png)
# 二、正确理解安全性
public class MyTest {
public static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
public static void main(String[] args) {
Integer count = map.get("count");
if (count == null) {
map.put("count", 1);
} else {
map.put("count",count + 1);
}
while (true) {
Integer count = map.get("count");
if (count == null) {
// 将存储到map中后,会返回null,代表成功
if (map.putIfAbsent("count", 1) == null) {
// putVal(key, value, true);
break;
}
} else {
if (map.replace("count", count, count + 1)) {
break;
}
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 三、put方法开始
从put方法开始,发现put方法调用了putVal(key, value, false);
putIfAbsent,他调用的方式是:putVal(key, value, true);.
final V putVal(K key, V value, boolean onlyIfAbsent) {
// key和value不允许为null~~~
if (key == null || value == null) throw new NullPointerException();
// 基于key计算出hash值
int hash = spread(key.hashCode());
// 声明了一个标识,暂时不管
int binCount = 0;
// 死循环,声明了一波tab,为当前ConcurrentHashMap的数组
for (Node<K,V>[] tab = table;;) {
// 因为Doug Lea老爷子,他的变量名都这样。
// tab-数组 n-数组长度 i-数据要存储的索引位置
// f-当前索引位置的数据
Node<K,V> f; int n, i, fh;
// 如果数组为null,或者数组的长度为0
if (tab == null || (n = tab.length) == 0)
// 初始化数组
tab = initTable();
// 数组已经初始化了,将数据插入的map中
// 将 数组长度-1 & key的hash值,作为索引 ,配合tabAt方法获取这个索引位置的数据
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 当前索引位置没有数据,将数据插入到这个位置
// 采用了CAS的方式,将数据插入,插入成功,返回true, 插入失败,返回false
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
// 成功!
break;
}
// 省略部分代码…………
}
tabAt(数组,i)-获取数组中i索引位置的数据(table[i])
casTabAt(数组,1,2,3)-以CAS的方式,将数组中i位置的数据从2修改为3
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 四、spread方法(散列算法、尽量避免Hash冲突)
// 调用方法拿hash值
int hash = spread(key.hashCode());
// 具体方法
static final int spread(int h) {
return (h ^ (h >>> 16));
}
00000110 00011000
^
00000110 00011000 00111010 00001110 - hash
&
00000000 00000000 00000000 00001111 - 15
(n - 1) & hash - 索引位置
将key的hashcode值的高16位与低16位进行^运算,将结果与数组的长度-1进行&运算,得到当前数据需要添加到哪个索引位置,因为这样做可以尽可能的打散数据,避免hash冲突
// ------------------------
return (h ^ (h >>> 16)) & HASH_BITS;
保证key的hashcode值一定是一个正数!
因为负数有特殊含义:
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 五、initTable方法
sizeCtl:
-1:代表map数组正在初始化
小于-1:代表正在扩容(-2,有1个线程正在扩容,-3,代表有2个线程正在扩容)
0:还没有初始化
正数:如果没有初始化,代表要初始化的长度。 如果已经初始化了,代表扩容的阈值。
// 初始化数组
private final Node<K,V>[] initTable() {
// …………
Node<K,V>[] tab; int sc;
// 判断数组初始化了嘛
while ((tab = table) == null || tab.length == 0) {
// sizeCtl复制给sc,并判断是否小于0
if ((sc = sizeCtl) < 0)
// 让一手~~~
Thread.yield();
// sizeCtl大于等于0,以CAS的方式,将sizeCtl设置为-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 我要开始初始化了!!!
// 再次做了判断(单例模式懒汉的DCL)
if ((tab = table) == null || tab.length == 0) {
// 获取数组初始化的长度,如果sc>0,以sc作为长度,如果sc为0,就默认16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// table初始化完毕
table = tab = nt;
// 得到下次扩容的阈值,赋值给sc
sc = n - (n >>> 2);
}
} finally {
// 将sc复制给sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 六、hash冲突存储
final V putVal(K key, V value, boolean onlyIfAbsent) {
// …………
for (Node<K,V>[] tab = table;;) {
// …………
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
// 出现了hash冲突,需要将数据挂到链表上,或者添加到红黑树中
V oldVal = null;
// 锁住当前桶的位置
synchronized (f) {
// 拿到i索引位置的数据,判断跟锁是不是一个。
if (tabAt(tab, i) == f) {
// 当前桶下是链表,或者是空!
if (fh >= 0) {
// 标识设置为1
binCount = 1;
//
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 值一样!不是添加,是修改操作 String s = "123"; String s = new String("123");
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 获取到当前位置的value值
oldVal = e.val;
// 是否是IfAbsent,如果为false,不是,覆盖数据,如果为true,什么都不做,break
if (!onlyIfAbsent)
e.val = value;
break;
}
// 追加操作
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 如果next指向的是null,就直接插入到后面
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 说明桶下是棵树!
else if (f instanceof TreeBin) {
// 数据插入到红黑树
}
}
}
if (binCount != 0) {
// binCount是否大于等于8,如果大于等于8需要判断是否需要将链表转为红黑树
if (binCount >= TREEIFY_THRESHOLD)
// 尝试转红黑树
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
return null;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 七、treeifyBin方法
// 转红黑树
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// 获取数组长度是否小于64,
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 尝试扩容
tryPresize(n << 1);
// 转红黑树
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
//....
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
# 八、tryPresize方法
private final void tryPresize(int size) {
// -------------------------------------------------------
// 对扩容数组长度做一波判断,判断是否达到最大值,其次是保证是2的n次幂
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);
// -------------------------------------------------------
int sc;
// 拿到sizeCtl,是否大于0
while ((sc = sizeCtl) >= 0) {
// 有两种可能,第一种没初始化数组(putAll方法),第二种数组已经初始化(正常捋顺过来的)
Node<K,V>[] tab = table; int n;
// 这里是初始化数组的操作
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
// -------------------------------------------------------
// 如果扩容长度,已经小于扩容阈值。(扩容完事了~~~)
// 数组长度已经大于等于最大长度
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
// -------------------------------------------------------
// 开始扩容
else if (tab == table) {
// 得到一个扩容戳(长度32位的数值,高16位做扩容标识,低16位做扩容线程数)
int rs = resizeStamp(n);
if (sc < 0) {
// sc小于0,已经开始扩容了,帮助扩容!
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 咱们没有线程扩容,我先设置sizeCtl标记,开始库哦哦让
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 重要成员变量
ConcurrentHashMap拥有出色的性能, 在真正掌握内部结构时, 先要掌握比较重要的成员:
- LOAD_FACTOR: 负载因子, 默认75%, 当table使用率达到75%时, 为减少table的hash碰撞, tabel长度将扩容一倍。负载因子计算: 元素总个数%table.lengh
- TREEIFY_THRESHOLD: 默认8, 当链表长度达到8时, 将结构转变为红黑树。
- UNTREEIFY_THRESHOLD: 默认6, 红黑树转变为链表的阈值。
- MIN_TRANSFER_STRIDE: 默认16, table扩容时, 每个线程最少迁移table的槽位个数。
- MOVED: 值为-1, 当Node.hash为MOVED时, 代表着table正在扩容
- TREEBIN, 置为-2, 代表此元素后接红黑树。
- nextTable: table迁移过程临时变量, 在迁移过程中将元素全部迁移到nextTable上。
- sizeCtl: 用来标志table初始化和扩容的,不同的取值代表着不同的含义:
- 0: table还没有被初始化
- -1: table正在初始化
- 小于-1: 实际值为resizeStamp(n)
- 大于0: 初始化完成后, 代表table最大存放元素的个数, 默认为0.75*n
- transferIndex: table容量从n扩到2n时, 是从索引n->1的元素开始迁移, transferIndex代表当前已经迁移的元素下标
- ForwardingNode: 一个特殊的Node节点, 其hashcode=MOVED, 代表着此时table正在做扩容操作。扩容期间, 若table某个元素为null, 那么该元素设置为ForwardingNode, 当下个线程向这个元素插入数据时, 检查hashcode=MOVED, 就会帮着扩容。
ConcurrentHashMap由三部分构成, table+链表+红黑树, 其中table是一个数组, 既然是数组, 必须要在使用时确定数组的大小, 当table存放的元素过多时, 就需要扩容, 以减少碰撞发生次数, 本文就讲解扩容的过程。扩容检查主要发生在插入元素(putVal())的过程:
- 一个线程插完元素后, 检查table使用率, 若超过阈值, 调用transfer进行扩容
- 一个线程插入数据时, 发现table对应元素的hash=MOVED, 那么调用helpTransfer()协助扩容。
# 协助扩容helpTransfer
下面是协助扩容的过程
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { //table扩容
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 根据 length 得到一个标识符号
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {//说明还在扩容
//判断是否标志发生了变化|| 扩容结束了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
//达到最大的帮助线程 || 判断扩容转移下标是否在调整(扩容结束)
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 将 sizeCtl + 1, (表示增加了一个线程帮助其扩容)
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
主要做了如下事情:
检查是否扩容完成
对sizeCtrl = sizeCtrl+1, 然后调用transfer()进行真正的扩容。
# 扩容transfer
扩容的整体步骤就是新建一个nextTab, size是之前的2倍, 将table上的非空元素迁移到nextTab上面去。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
//计算扩容间隔
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
// subdivide range,每个线程最少迁移16个槽位,大的话,最多
stride = MIN_TRANSFER_STRIDE;
//如果基于长度和CPU内核运算,得到的结果比16大,那么每次迁移数据的长度就用计算的值
//否则每次最少迁移16位
// initiating 才开始初始化新的nextTab,如果nexTab = null,代表我是第一个进来扩容的线程
if (nextTab == null) {
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //扩容2倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;//更新的转移下标,
}
//获取新数组长度
int nextn = nextTab.length;
//ForwardingNode在桶迁移完数据后,放置的节点,代表当前位置已经迁移完毕
//fwd的hash值为-1,代表当前桶迁移数据完毕
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//是否能够向前推进到下一个周期;
boolean advance = true;
// to ensure sweep before committing nextTab,完成状态,如果是,则结束此方法
boolean finishing = false;
//死循环 i代表桶的上限值 bound代表下限值
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) { //取下一个周期
int nextIndex, nextBound;
//本线程处理的区间范围为[bound, i),范围还没有处理完成,那么就继续处理
if (--i >= bound || finishing)
advance = false;
//目前处理到了这里(从大到小, 下线),开始找新的一轮的区间
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//这个条件改变的是transferIndex的值,从16变成了1
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
//nextBound 是这次迁移任务的边界,注意,是从后往前
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound; //一块区间最小桶的下标
i = nextIndex - 1; //能够处理的最大桶的下标
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) { //每个迁移线程都能达到这里
int sc;
if (finishing) { //迁移完成
nextTable = null;
//直接把以前的table丢弃了,上面的MOVE等标志全部丢弃,使用新的
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1); //扩大2n-0.5n = 1.50n, 更新新的容量阈值
return;
}
//表示当前线程迁移完成了
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//注意此时sc的值并不等于sizeCtl,上一步,sizeCtl=sizeCtl-1了。这两个对象还是分割的
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果对应位置为null, 则将ForwardingNode放在对应的地方
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED) //别的线程已经在处理了,再推进一个下标
advance = true; // already processed,推动到下一个周期,仍然会检查i与bound是否结束
else { //说明位置上有值了,
//需要加锁,防止再向里面放值,在放数据时,也会锁住。比如整个table正在迁移,还没有迁移到这个元素,另外一个线程向这个节点插入数据,此时迁移到这里了,会被阻塞住
synchronized (f) {
if (tabAt(tab, i) == f) {//判断i下标和f是否相同
Node<K,V> ln, hn; //高位桶, 地位桶
if (fh >= 0) {
int runBit = fh & n;//n为2^n, 取余后只能是2^n
Node<K,V> lastRun = f;
///找到最后一个不和fn相同的节点
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
//只要找到这,之后的取值都是一样的,下次循环时,就不用再循环后面的
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else { //比如1,16,32,如果低位%16,那么肯定是0。
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
//这样就把相同串的给串起来了
ln = new Node<K,V>(ph, pk, pv, ln);
else
//这样就把相同串的给串起来了,注意这里ln用法,第一个next为null,烦着串起来了。
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); //反着给串起来了
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {// 如果是红黑树
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null; //也是高低节点
TreeNode<K,V> hi = null, hiTail = null;//也是高低节点
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) { //中序遍历红黑树
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) { //0的放低位
//注意这里p.prev = loTail,每一个p都是下一个的prev
if ((p.prev = loTail) == null)
lo = p; //把头记住
else
loTail.next = p; //上一次的p的next是这次的p
loTail = p; //把上次p给记住
++lc;
}
else { //高位
if ((p.prev = hiTail) == null)
hi = p; //把尾记住
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :// //判断是否需要转化为树
(hc != 0) ? new TreeBin<K,V>(lo) : t; //如果没有高低的话,则部分为两个树
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
其中有两个变量需要了解下:
- advance: 表示是否可以向下一个轮元素进行迁移。
- finishing: table所有元素是否迁移完成。
大致做了如下事情:
- 确定线程每轮迁移元素的个数stride, 比如进来一个线程, 确定扩容table下标为(a,b]之间元素, 下一个线程扩容(b,c]。这里对b-a或者c-b也是由最小值16限制的。 也就是说每个线程最少扩容连续16个table的元素。而标志当前迁移的下标保存在transferIndex里面。
- 检查nextTab是否完成初始化, 若没有的话, 说明是第一个迁移的线程, 先初始化nextTab, size是之前table的2倍。
- 进入while循环查找本轮迁移的table下标元素区间, 保存在(bound, i]中, 注意这里是半开半闭区间。
- 从i -> bound开始遍历table中每个元素, 这里是从大到小遍历的:
- 若该元素为空, 则向该元素标写入ForwardingNode, 然后检查下一个元素。 当别的线程向这个元素插入数据时, 根据这个标志符知道了table正在被别的线程迁移, 在putVal中就会调用helpTransfer帮着迁移。
- 若该元素的hash=MOVED, 代表次table正在处于迁移之中, 跳过。 按道理不会跑着这里的。
- 否则说明该元素跟着的是一个链表或者是个红黑树结构, 若hash>0, 则说明是个链表, 若f instanceof TreeBin, 则说明是个红黑树结构。
- 链表迁移原理如下: 遍历链表每个节点。 若节点的f.hash&n==0成立, 则将节点放在i, 否则, 则将节点放在n+i上面。
迁移前, 对该元素进行加锁。 遍历链表时, 这里使用lastRun变量, 保留的是上次hash的值, 假如整个链表全部节点f.hash&n==0, 那么第二次遍历, 只要找到lastRun的值, 那么认为之后的节点都是相同值, 减少了不必要的f.hash&n取值。遍历完所有的节点后, 此时形成了两条链表, ln存放的是f.hash&n=0的节点, hn存放的是非0的节点, 然后将ln存放在nextTable第i元素的位置, n+i存放在n+i的位置。
蓝色节点代表:f.hash&n==0, 绿色节点代表f.hash&n!=0。 最终蓝色的节点仍在存放在(0, n)范围里, 绿的节点存放在(n, 2n-1)的范围之内。
- 迁移链表和红黑树的原理是一样的, 在红黑树中, 我们记录了每个红黑树的first(这个节点不是hash最小的节点)和每个节点的next, 根据这两个元素, 我们可以访问红黑树所有的元素, 红黑树此时也是一个链表, 红黑树和链表迁移的过程一样。红黑树根据迁移后拆分成了hn和ln, 根据链表长度确定链表是红黑树结构还是退化为了链表。
4.如何确定table所有元素迁移完成:
//表示当前线程迁移完成了
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//注意此时sc的值并不等于sizeCtl,上一步,sizeCtl=sizeCtl-1了。这两个对象还是分割的
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
2
3
4
5
6
7
8
第一个线程开始迁移时, 设置了sizeCtl= resizeStamp(n) << RESIZE_STAMP_SHIFT+2, 此后每个新来帮助迁移的线程都会sizeCtl=sizeCtl+1, 完成迁移后,sizeCtl-1, 那么只要有一个线程还处于迁移状态, 那么sizeCtl> resizeStamp(n) << RESIZE_STAMP_SHIFT+2一直成立, 当只有最后一个线程完成迁移之后, 等式两边才成立。 可能大家会有疑问, 第一个线程并没有对sizeCtl=sizeCtl+1, 此时完成后再减一, 那不是不相等了吗, 注意这里, sizeCtl在减一前, 将值赋给了sc, 等式比较的是sc。
# 总结
table扩容过程就是将table元素迁移到新的table上, 在元素迁移时, 可以并发完成, 加快了迁移速度, 同时不至于阻塞线程。所有元素迁移完成后, 旧的table直接丢失, 直接使用新的table。
# CopyOnWrite机制
核心思想:读写分离,空间换时间,避免为保证并发安全导致的激烈的锁竞争。
划关键点:
1、CopyOnWrite适用于读多写少的情况,最大程度的提高读的效率;
2、CopyOnWrite是最终一致性,在写的过程中,原有的读的数据是不会发生更新的,只有新的读才能读到最新数据;
3、如何使其他线程能够及时读到新的数据,需要使用volatile变量;
4、写的时候不能并发写,需要对写操作进行加锁;
# 源码原理
写时复制
/*
* 添加元素api
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); //复制一个array副本
newElements[len] = e; //往副本里写入
setArray(newElements); //副本替换原本,成为新的原本
return true;
} finally {
lock.unlock();
}
}
//读api
public E get(int index) {
return get(getArray(), index); //无锁
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21