ConcurrentSkipListMap数据结构
抓住了数据结构,对于理解整个ConcurrentSkipListMap有很重要的作用,其实,通过源码可知其数据结构如下。 可以看到ConcurrentSkipListMap的数据结构使用的是跳表,每一个HeadIndex、Index结点都会包含一个对Node的引用,同一垂直方向上的Index、HeadIndex结点都包含了最底层的Node结点的引用。并且层级越高,该层级的结点(HeadIndex和Index)数越少。Node结点之间使用单链表结构。
ConcurrentSkipListMap的继承关系
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
implements ConcurrentNavigableMap<K,V>, Cloneable
, Serializable
{}
ConcurrentSkipListMap继承了AbstractMap抽象类,实现了ConcurrentNavigableMap接口,该接口定义了获取某一部分集合的操作。实现了Cloneable接口,表示允许克隆。实现了Serializable接口,表示可被序列化。
ConcurrentSkipListMap的内部类
ConcurrentSkipListMap包含了很多内部类,内部类的框架图如下: 其中,最为重要的类包括Index、HeadIndex、Node三个类。下面对这三个类进行逐一讲解
Index类
static class Index<K,V> {
final Node
<K,V> node
;
final Index
<K,V> down
;
volatile Index
<K,V> right
;
private static final sun
.misc
.Unsafe UNSAFE
;
private static final long rightOffset
;
static {
try {
UNSAFE
= sun
.misc
.Unsafe
.getUnsafe();
Class
<?> k
= Index
.class;
rightOffset
= UNSAFE
.objectFieldOffset
(k
.getDeclaredField("right"));
} catch (Exception e
) {
throw new Error(e
);
}
}
}
Index结点包括一个Node结点的引用,都是包含down域和right域,即对应数据结构中的Index结点。并且借助了反射来原子性的修改right域。
构造函数
Index(Node
<K,V> node
, Index
<K,V> down
, Index
<K,V> right
) {
this.node
= node
;
this.down
= down
;
this.right
= right
;
}
构造Index结点,确定Node引用,down域和right域
link函数
final boolean link(Index
<K,V> succ
, Index
<K,V> newSucc
) {
Node
<K,V> n
= node
;
newSucc
.right
= succ
;
return n
.value
!= null
&& casRight(succ
, newSucc
);
}
link方法用于在当前Index结点的后面插入一个Index结点,形成其right结点。并且插入的Index结点的right域为当前结点的right域。
unlink函数
final boolean unlink(Index
<K,V> succ
) {
return node
.value
!= null
&& casRight(succ
, succ
.right
);
}
unlink方法与link方法作用相反,其删除当前Index结点的right结点,即将当前Index结点的right指向当前Index结点的right.right域。
HeadIndex类
static final class HeadIndex<K,V> extends Index<K,V> {
final int level
;
HeadIndex(Node
<K,V> node
, Index
<K,V> down
, Index
<K,V> right
, int level
) {
super(node
, down
, right
);
this.level
= level
;
}
}
根据HeadIndex类可知其继承自Index类,并且在Index类的基础上添加了level域,表示当前的层级。
Node类
static final class Node<K,V> {
final K key
;
volatile Object value
;
volatile Node
<K,V> next
;
private static final sun
.misc
.Unsafe UNSAFE
;
private static final long valueOffset
;
private static final long nextOffset
;
static {
try {
UNSAFE
= sun
.misc
.Unsafe
.getUnsafe();
Class
<?> k
= Node
.class;
valueOffset
= UNSAFE
.objectFieldOffset
(k
.getDeclaredField("value"));
nextOffset
= UNSAFE
.objectFieldOffset
(k
.getDeclaredField("next"));
} catch (Exception e
) {
throw new Error(e
);
}
}
}
Node类包含了key、value、next域,其是用来实际存放元素的结点,并且是使用单链表结构。同时,也使用了反射来原子性的修改value与和next域。
构造函数
Node(K key
, Object value
, Node
<K,V> next
) {
this.key
= key
;
this.value
= value
;
this.next
= next
;
}
Node(Node
<K,V> next
) {
this.key
= null
;
this.value
= this;
this.next
= next
;
}
Node类包含了两种构造函数,分别表示正常的结点和marker标记结点,marker标记结点在删除结点时被使用。
helpDelete函数
void helpDelete(Node
<K,V> b
, Node
<K,V> f
) {
if (f
== next
&& this == b
.next
) {
if (f
== null
|| f
.value
!= f
)
casNext(f
, new Node<K,V>(f
));
else
b
.casNext(this, f
.next
);
}
}
删除结点,在结点后面添加一个marker结点或者将结点和其后的marker结点从其前驱中断开。
ConcurrentSkipListMap的属性
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
implements ConcurrentNavigableMap<K,V>, Cloneable
, Serializable
{
private static final long serialVersionUID
= -8627078645895051609L
;
private static final Object BASE_HEADER
= new Object();
private transient volatile HeadIndex
<K,V> head
;
final Comparator
<? super K
> comparator
;
private transient KeySet
<K> keySet
;
private transient EntrySet
<K,V> entrySet
;
private transient Values
<V> values
;
private transient ConcurrentNavigableMap
<K,V> descendingMap
;
private static final sun
.misc
.Unsafe UNSAFE
;
private static final long headOffset
;
private static final long SECONDARY
;
static {
try {
UNSAFE
= sun
.misc
.Unsafe
.getUnsafe();
Class
<?> k
= ConcurrentSkipListMap
.class;
headOffset
= UNSAFE
.objectFieldOffset
(k
.getDeclaredField("head"));
Class
<?> tk
= Thread
.class;
SECONDARY
= UNSAFE
.objectFieldOffset
(tk
.getDeclaredField("threadLocalRandomSecondarySeed"));
} catch (Exception e
) {
throw new Error(e
);
}
}
}
ConcurrentSkipListMap包含了head属性,表示跳表的头结点,并且包含了一个比较器,值得注意的是,对于ConcurrentSkipListMap的使用,键必须能够进行比较,如传递了比较器或者键本身就能够进行比较。同时,也使用了反射来保证原子性的更新head域。
ConcurrentSkipListMap的构造函数
public ConcurrentSkipListMap() {
this.comparator
= null
;
initialize();
}
构造一个新的空映射,该映射按照键的自然顺序进行排序,即键K必须实现了Comparable接口,否则,会报错。
ConcurrentSkipListMap的核心函数
doPut函数
private V
doPut(K key
, V value
, boolean onlyIfAbsent
) {
Node
<K,V> z
;
if (key
== null
)
throw new NullPointerException();
Comparator
<? super K
> cmp
= comparator
;
outer
: for (;;) {
for (Node
<K,V> b
= findPredecessor(key
, cmp
), n
= b
.next
;;) {
if (n
!= null
) {
Object v
; int c
;
Node
<K,V> f
= n
.next
;
if (n
!= b
.next
)
break;
if ((v
= n
.value
) == null
) {
n
.helpDelete(b
, f
);
break;
}
if (b
.value
== null
|| v
== n
)
break;
if ((c
= cpr(cmp
, key
, n
.key
)) > 0) {
b
= n
;
n
= f
;
continue;
}
if (c
== 0) {
if (onlyIfAbsent
|| n
.casValue(v
, value
)) {
@SuppressWarnings("unchecked") V vv
= (V
)v
;
return vv
;
}
break;
}
}
z
= new Node<K,V>(key
, value
, n
);
if (!b
.casNext(n
, z
))
break;
break outer
;
}
}
int rnd
= ThreadLocalRandom
.nextSecondarySeed();
if ((rnd
& 0x80000001) == 0) {
int level
= 1, max
;
while (((rnd
>>>= 1) & 1) != 0)
++level
;
Index
<K,V> idx
= null
;
HeadIndex
<K,V> h
= head
;
if (level
<= (max
= h
.level
)) {
for (int i
= 1; i
<= level
; ++i
)
idx
= new Index<K,V>(z
, idx
, null
);
}
else {
level
= max
+ 1;
@SuppressWarnings("unchecked")Index
<K,V>[] idxs
=
(Index
<K,V>[])new Index<?,?>[level
+1];
for (int i
= 1; i
<= level
; ++i
)
idxs
[i
] = idx
= new Index<K,V>(z
, idx
, null
);
for (;;) {
h
= head
;
int oldLevel
= h
.level
;
if (level
<= oldLevel
)
break;
HeadIndex
<K,V> newh
= h
;
Node
<K,V> oldbase
= h
.node
;
for (int j
= oldLevel
+1; j
<= level
; ++j
)
newh
= new HeadIndex<K,V>(oldbase
, newh
, idxs
[j
], j
);
if (casHead(h
, newh
)) {
h
= newh
;
idx
= idxs
[level
= oldLevel
];
break;
}
}
}
splice
: for (int insertionLevel
= level
;;) {
int j
= h
.level
;
for (Index
<K,V> q
= h
, r
= q
.right
, t
= idx
;;) {
if (q
== null
|| t
== null
)
break splice
;
if (r
!= null
) {
Node
<K,V> n
= r
.node
;
int c
= cpr(cmp
, key
, n
.key
);
if (n
.value
== null
) {
if (!q
.unlink(r
))
break;
r
= q
.right
;
continue;
}
if (c
> 0) {
q
= r
;
r
= r
.right
;
continue;
}
}
if (j
== insertionLevel
) {
if (!q
.link(r
, t
))
break;
if (t
.node
.value
== null
) {
findNode(key
);
break splice
;
}
if (--insertionLevel
== 0)
break splice
;
}
if (--j
>= insertionLevel
&& j
< level
)
t
= t
.down
;
q
= q
.down
;
r
= q
.right
;
}
}
}
return null
;
}
说明:doPut提供对put函数的支持,doPut的大体流程如下:
① 根据给定的key从跳表的左上方往右或者往下查找到Node链表的前驱Node结点,这个查找过程会删除一些已经标记为删除的结点。
② 找到前驱结点后,开始往后插入查找插入的位置(因为找到前驱结点后,可能有另外一个线程在此前驱结点后插入了一个结点,所以步骤①得到的前驱现在可能不是要插入的结点的前驱,所以需要往后查找)。
③ 随机生成一个种子,判断是否需要增加层级,并且在各层级中插入对应的Index结点。 其中,会调用到findPredecessor函数,findPredecessor函数源码如下
private Node
<K,V> findPredecessor(Object key
, Comparator
<? super K
> cmp
) {
if (key
== null
)
throw new NullPointerException();
for (;;) {
for (Index
<K,V> q
= head
, r
= q
.right
, d
;;) {
if (r
!= null
) {
Node
<K,V> n
= r
.node
;
K k
= n
.key
;
if (n
.value
== null
) {
if (!q
.unlink(r
))
break;
r
= q
.right
;
continue;
}
if (cpr(cmp
, key
, k
) > 0) {
q
= r
;
r
= r
.right
;
continue;
}
}
if ((d
= q
.down
) == null
)
return q
.node
;
q
= d
;
r
= d
.right
;
}
}
}
说明:findPredecessor函数的主要流程如下。
从头结点(head)开始,先比较key与当前结点的key的大小,若key大于当前Index结点的key并且当前Index结点的right不为空,则向右移动,继续查找;若当前Index结点的right为空,则向下移动,继续查找;若key小于等于当前Index结点的key,则向下移动,继续查找。直至找到前驱结点。
doRemove函数
final V
doRemove(Object key
, Object value
) {
if (key
== null
)
throw new NullPointerException();
Comparator
<? super K
> cmp
= comparator
;
outer
: for (;;) {
for (Node
<K,V> b
= findPredecessor(key
, cmp
), n
= b
.next
;;) {
Object v
; int c
;
if (n
== null
)
break outer
;
Node
<K,V> f
= n
.next
;
if (n
!= b
.next
)
break;
if ((v
= n
.value
) == null
) {
n
.helpDelete(b
, f
);
break;
}
if (b
.value
== null
|| v
== n
)
break;
if ((c
= cpr(cmp
, key
, n
.key
)) < 0)
break outer
;
if (c
> 0) {
b
= n
;
n
= f
;
continue;
}
if (value
!= null
&& !value
.equals(v
))
break outer
;
if (!n
.casValue(v
, null
))
break;
if (!n
.appendMarker(f
) || !b
.casNext(n
, f
))
findNode(key
);
else {
findPredecessor(key
, cmp
);
if (head
.right
== null
)
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv
= (V
)v
;
return vv
;
}
}
return null
;
}
doRemove函数的处理流程如下。
① 根据key值找到前驱结点,查找的过程会删除一个标记为删除的结点。
② 从前驱结点往后查找该结点。
③ 在该结点后面添加一个marker结点,若添加成功,则将该结点的前驱的后继设置为该结点之前的后继。
④ 头结点的next域是否为空,若为空,则减少层级。
下面的示意图给出了remove操作一种可能的情况(仅仅涉及Node结点的链表层的操作) 可以看到remove操作是分为两步进行的,首先是在要删除结点的后面添加一个marker结点,然后修改删除结点的前驱结点的next域。注意,这里仅仅只给出了Node结点的链表层的操作,并没有涉及到Index结点,关于Index结点的情况,之后会给出一个示例。其中会调用到tryReduceLevel函数,tryReduceLevel源码如下
private void tryReduceLevel() {
HeadIndex
<K,V> h
= head
;
HeadIndex
<K,V> d
;
HeadIndex
<K,V> e
;
if (h
.level
> 3 &&
(d
= (HeadIndex
<K,V>)h
.down
) != null
&&
(e
= (HeadIndex
<K,V>)d
.down
) != null
&&
e
.right
== null
&&
d
.right
== null
&&
h
.right
== null
&&
casHead(h
, d
) &&
h
.right
!= null
)
casHead(d
, h
);
}
如果最高的前三个HeadIndex不为空,并且其right域都为null,那么就将level减少1层,并将head设置为之前head的下一层,设置完成后,还有检测之前的head的right域是否为null,如果为null,则减少层级成功,否则再次将head设置为h。
doGet函数
private V
doGet(Object key
) {
if (key
== null
)
throw new NullPointerException();
Comparator
<? super K
> cmp
= comparator
;
outer
: for (;;) {
for (Node
<K,V> b
= findPredecessor(key
, cmp
), n
= b
.next
;;) {
Object v
; int c
;
if (n
== null
)
break outer
;
Node
<K,V> f
= n
.next
;
if (n
!= b
.next
)
break;
if ((v
= n
.value
) == null
) {
n
.helpDelete(b
, f
);
break;
}
if (b
.value
== null
|| v
== n
)
break;
if ((c
= cpr(cmp
, key
, n
.key
)) == 0) {
@SuppressWarnings("unchecked") V vv
= (V
)v
;
return vv
;
}
if (c
< 0)
break outer
;
b
= n
;
n
= f
;
}
}
return null
;
}
doGet函数流程比较简单,首先根据key找到前驱结点,然后从前驱结点开始往后查找,找到与key相等的结点,则返回该结点,否则,返回null。在这个过程中会删除一些已经标记为删除状态的结点。
size函数
public int size() {
long count
= 0;
for (Node
<K,V> n
= findFirst(); n
!= null
; n
= n
.next
) {
if (n
.getValidValue() != null
)
++count
;
}
return (count
>= Integer
.MAX_VALUE
) ? Integer
.MAX_VALUE
: (int) count
;
}
size函数的流程如下,首先利用findFirst函数找到第一个value不为null的结点。然后开始往后遍历,调用Node结点的getValidValue函数判断结点的value是否有效,有效则计数器加1。其中findFirst源码如下
final Node
<K,V> findFirst() {
for (Node
<K,V> b
, n
;;) {
if ((n
= (b
= head
.node
).next
) == null
)
return null
;
if (n
.value
!= null
)
return n
;
n
.helpDelete(b
, n
.next
);
}
}
findFirst函数的功能是找到第一个value不为null的结点。getValidValue源码如下
V
getValidValue() {
Object v
= value
;
if (v
== this || v
== BASE_HEADER
)
return null
;
@SuppressWarnings("unchecked") V vv
= (V
)v
;
return vv
;
}
若结点的value为自身或者是BASE_HEADER,则返回null,否则返回结点的value。
示例
下面通过一个简单的示例,来深入了解ConcurrentSkipListMap的内部结构。
package com
.hust
.grid
.leesf
.collections
;
import java
.util
.concurrent
.ConcurrentSkipListMap
;
public class ConcurrentSkipListMapDemo {
public static void main(String
[] args
) {
ConcurrentSkipListMap
<String, Integer> cslm
= new ConcurrentSkipListMap<String, Integer>();
cslm
.put("leesf", 24);
cslm
.put("dyd", 24);
for (String key
:cslm
.keySet()) {
System
.out
.print("[" + key
+ "," + cslm
.get(key
) + "] ");
}
System
.out
.println();
cslm
.remove("leesf");
for (String key
:cslm
.keySet()) {
System
.out
.print("[" + key
+ "," + cslm
.get(key
) + "] ");
}
}
}
运行结果:
[dyd,24] [leesf,24] [dyd,24]
上面的一个示例非常简单,下面借这个示例,来分析ConcurrentSkipListMap的内部结构。
① 当新生一个ConcurrentSkipListMap时,有如下结构。 ② 当put(“leesf”, 24)后,可能有如下结构 在插入一个Node结点的同时,也插入一个Index结点,并且head结点的right域指向该Index结点,该Index的Node域指向插入的Node结点。
③ 当put(“dyd”, 24)后,可能有如下结构 插入的(“dyd”, 24),新生成的结点在leesf结点之前,并且也生成了一个Index结点指向它,此时跳表的层级还是为1。
④ 同样,当put(“dyd”, 24)后,也可能有如下结构 在插入(“dyd”, 24)后,层级加1,此时会生成两个Index结点,并且两个Index结点均指向新生成的Node结点。
⑤ 在remove(“dyd”)之后,存在如下结构 在key为dyd的结点后面添加了一个marker结点(key为null,value为自身),并且Node结点对应的Index也将从Index链表中断开,最后会被GC。
转载至:【JUC】JDK1.8源码分析之ConcurrentSkipListMap(二)