Java进阶—HashMap的线程不安全问题

我们知道HashMap是线程不安全的,具体表现在多线程操作同一个HashMap对象时,会造成死循环(环形链)、数据丢失、数据覆盖这些问题。其中死循环和数据丢失在JDK1.7中会出现但在JDK1.8中已经得到解决,然而JDK1.8中仍会有数据覆盖的问题以及size自增、modCount自增的线程安全问题。

环形链

首先分析一下在JDK1.7中出现的死循环(环形链)问题,这个问题的根源是发生在哈希数组扩容(resize)时,将旧哈希表中的数据拷贝到新哈希表的过程中(即transfer(Entry[] newTable, boolean rehash)这个方法),链表形成了一个首尾项链的环;当通过调用HashMap的get等方法去遍历这个链表时,会造成死循环。

先重新回顾一下源码分析—HashMap(JDK1.7)这篇文章里的transfer方法源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//将老的表中的数据拷贝到新的结构中  
void transfer(Entry[] newTable, boolean rehash) {
int newCapacity = newTable.length;//容量
for (Entry<K,V> e : table) { //遍历所有桶
while(null != e) { //遍历桶中所有元素(是一个链表)
Entry<K,V> next = e.next;
if (rehash) {//如果是重新Hash,则需要重新计算hash值
e.hash = null == e.key ? 0 : hash(e.key);
}
int i = indexFor(e.hash, newCapacity);//定位Hash桶
e.next = newTable[i];//元素连接到桶中,这里相当于单链表的插入,总是插入在最前面
newTable[i] = e;//newTable[i]的值总是最新插入的值
e = next;//继续下一个元素
}
}
}

这段代码的流程简单来说,就是遍历旧的的哈希数组,重新定位每个Entry的下标,并采用头插法将元素迁移到新数组中。

什么是头插法?就是在链表上插入节点时,总是放在头部位置,原来的头结点将会变成第二个节点,以此类推。

头插法是造成死循环的关键,下面对这个问题做一下过程分析。

分析环形链的产生过程

假设现在有两个线程A、B同时对下面这个HashMap进行扩容操作:

正常扩容后的结果是下面这样的:

但是当线程A在要执行上面transfer方法的newTable[i] = e这行代码时,由于CPU时间片耗尽,线程A被挂起。即如下图中位置所示:

此时线程A中:e=3、next=7、e.next=null

当线程A的时间片耗尽后,CPU开始执行线程B,并在线程B中成功的完成了数据迁移:

重点来了,根据Java内存模式可知,线程B执行完数据迁移后,此时堆中newTable和table都是最新的,也就是说:7.next=3、3.next=null。

随后线程A获得CPU时间片继续执行newTable[i] = e,将3放入新数组对应的位置,执行完此轮循环后线程A的情况如下:

接着继续执行下一轮循环,此时e=7,从主内存中读取e.next时发现主内存中7.next=3,于是乎next=3,并将7采用头插法的方式放入新数组中,最后e=3(e=next)并继续执行下一轮循环,结果如下:

接下来当执行完e.next=newTable[i]即3.next=7后,3和7之间就相互连接了,当执行完newTable[i]=e后,3被头插法重新插入到链表中。此外因为从堆中读取到的3.next=null,e将被赋值为null。执行结果如下图所示:

上面说了此时3.next=null即next=null,当执行完e=next后,将不会进行下一轮循环。到此线程A、B的扩容操作完成,很明显当线程A执行完后,HashMap中出现了环形结构,当以后对该HashMap进行操作时会出现死循环。

并且从上图可以发现,元素5在扩容期间被莫名的丢失了,这就发生了数据丢失的问题。

数据覆盖

在JDK1.8中已经不会出现环形链的问题。分析JDK1.8中HashMap的源码,已经没有了tranfer方法,这个方法的逻辑直接写在resize方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
final Node<K,V>[] resize() {
//oldTab 旧的哈希数组,oldCap 旧的数组的长度,oldThr 旧的阈值,newCap 新的数组长度,newThr 新的阈值
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
//oldCap > 0 说明不是在初始化数组
if (oldCap > 0) {
//数组已经到最大长度,将阈值改为最大值,且不再扩容
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
//扩容两倍后的长度<最大值,并且old长度也>=16,new长度=old长度2倍
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; //阈值也扩容两倍
} else if (oldThr > 0) // old阈值有过初始化的情况
newCap = oldThr;
else { // old长度、old阈值都为0的情况,即默认初始化
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap]; //新建一个数组,并赋值给table属性
table = newTab;
if (oldTab != null) {
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e; //临时节点
if ((e = oldTab[j]) != null) {
oldTab[j] = null; //旧数组每个位置的元素置为null,帮助GC回收对象
if (e.next == null) //oldTab[j]处只有一个节点的情况
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode) //红黑树的情形
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { //链表情形
Node<K,V> loHead = null, loTail = null; //要放在旧索引位置的节点,这些节点在新数组的索引仍为j
Node<K,V> hiHead = null, hiTail = null; //要放在新索引位置的节点,这些节点在新数组的索引将会变成:j + oldCap
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null) // 如果loTail为空, 代表该节点为第一个节点
loHead = e; // 则将loHead赋值为第一个节点
else
loTail.next = e; // 否则将节点添加在loTail后面
loTail = e; // 并将loTail赋值为新增的节点
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
//如果loTail不为空(说明扩容有节点的数组索引j没变),则将最后一个节点的next设为空,并在新数组索引=j处设置对应的头节点为loHead。
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}

//如果hiTail不为空(说明扩容有节点的数组索引j发生变化),则将最后一个节点的next设为空,并在新数组索引=j + oldCap处设置对应的头节点为hiHead。
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}

从这段代码看出,如果哈希数组中元素如果是链表,扩容时采用的是尾插法,不管是将量表插入到index=j还是index=j+oldCap处,顺序是不变的,并且最后都会设置loTail.next = null;或者loTail.next = null;,这直接解决了环形链的问题。但在put方法中仍会存在数据覆盖的问题,在下图所示的代码处,假设两个线程A、B都在进行put操作,并且hash函数计算出的插入下标是相同的,当线程A在要执行这行代码时由于时间片耗尽导致被挂起,而线程B得到时间片后在该下标处插入了元素,完成了正常的插入,然后线程A获得时间片,由于之前已经进行了hash碰撞的判断,所有此时不会再进行判断,而是直接进行插入,这就导致了线程B插入的数据被线程A覆盖了,从而线程不安全。

线程安全的替代方案

因为HashMap是线程不安全的,如果需要线程安全的Map,可以使用HashTableCurrentHashMap或者Collections.synchronizedMap(Map)替代。其中HashTableCollections.synchronizedMap(Map)效率比较低,而CurrentHashMap采用分段锁的设计,从而使得多个线程在访问该Map的不同段的Entry时可以获取不同的锁,从而提高了效率,更推荐使用。

------ 本文完 ------