ConcurrentHashMap源码分析(JAVA 8)(综述)

    xiaoxiao2025-03-13  64

    文章目录

    一、ConcurrentHashMap缘起二、ConcurrentHashMap 的升级打怪                                  ---jdk1.7、jdk1.8实现方式存在差异三、ConcurrentHashMap武器解析(底层实现)(一) 常量(二) 构造函数(三) 主要的内部静态类(三) 关键方法 四、引用五、结语

    一、ConcurrentHashMap缘起

    HashMap存在线程安全

           话说在hashMap出现之际,就获得了无数java开发者的青睐,但是随着数据量的增多,迫使去解决多线程并发访问,首先在java7中由于采用的扩容方式(参考:https://blog.csdn.net/qq_32679835/article/details/90447248#_330) 会产生死锁的局面,即使在Java8中改进了原来头插扩容方式,依旧会存在缺少键值对的情况,必须去找到一个替代者去解决HashMap线程不安全的处境,HashTable作为老牌势力,虽然满足了线程安全的要求,但是他是通过synchronized关键字实现,通过互斥锁效率问题就值得商榷。

    Collection.synchronizedMap解决问题

            跟Hashtable可以说是难兄难弟,只是将Map的相应功能添加同步(synchrinized字段实现),性能与吞吐量存在问题,需要去选择一种新型数据结构来达到线程安全,此时ConcurrentHashMap出现了。

    二、ConcurrentHashMap 的升级打怪

                                      —jdk1.7、jdk1.8实现方式存在差异

    JDK 1.7

    1、HashMap还没有加入红黑树的概念,ConcurrentHashMap 选择通过segment分段锁来实现并发,每一个segment会负责管理HashMap数组中的几个桶,这样每一次对数组的操作就不需要锁定整个数组,而是只需要获得当前segment锁,相当于将锁的粒度细化。 此时考虑size()这些具有全局功能的函数,你要获取总共插入的元素个数,必然要获取所有segment锁,之后获取总数,显得有点麻烦,因此在此之前首先会先试图使用无锁的方式统计总数,这个尝试会进行3次,如果在相邻的2次计算中获得的Segment的modCount次数一致,代表这两次计算过程中都没有发生过修改操作,那么就可以当做最终结果返回。 2、ReentrantLock+segment+HashEntry实现

    jdk 1.8

    1、加入了红黑树的概念,将原本在链表中查找数据的时间复杂度O(n)修改为O(log n) 2、修改7中扩容的头插方式,能够很好的避免了死锁问题 3、虽然在1.8中依旧保存segment的静态类,完全摒弃了了segment分段锁的概念,而是选择了一种更加细粒度的操作,在每一个数组的node节点通过synchronized来同步 4、synchronized+cas+HashEntry实现

    三、ConcurrentHashMap武器解析(底层实现)

    (一) 常量

    // node数组最大容量:2^30=1073741824 private static final int MAXIMUM_CAPACITY = 1 << 30; // 默认初始值,必须是2的幕数 private static final int DEFAULT_CAPACITY = 16; //数组可能最大值,需要与toArray()相关方法关联 static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //并发级别,遗留下来的,为兼容以前的版本 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 负载因子 private static final float LOAD_FACTOR = 0.75f; // 链表转红黑树阀值,> 8 链表转换为红黑树 static final int TREEIFY_THRESHOLD = 8; //树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo)) static final int UNTREEIFY_THRESHOLD = 6; static final int MIN_TREEIFY_CAPACITY = 64; private static final int MIN_TRANSFER_STRIDE = 16; private static int RESIZE_STAMP_BITS = 16; // 2^15-1,help resize的最大线程数 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 32-16=16,sizeCtl中记录size大小的偏移量 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // forwarding nodes的hash值 static final int MOVED = -1; // 树根节点的hash值 static final int TREEBIN = -2; // ReservationNode的hash值 static final int RESERVED = -3; // 可用处理器数量 static final int NCPU = Runtime.getRuntime().availableProcessors(); //存放node的数组 transient volatile Node<K,V>[] table; //控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义 //当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容 //当为0时:代表当时的table还没有被初始化 //当为正数时:表示初始化或者下一次进行扩容的大小 private transient volatile int sizeCtl;

    (二) 构造函数

    ConcurrentHashMap()以默认常量初始化ConcurrentHashMap(int initialCapacity)以给定容量初始化,初始化容量为大于initialCapacity最小二次幂(tableSizeFor()产生)ConcurrentHashMap(Map<? extends K, ? extends V> m)容量为传入的Map大小,迭代(entrySet()函数)后通过putVal()传参ConcurrentHashMap(int initialCapacity, float loadFactor)默认调用底下参数ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel)构造size = (long)(1.0 + (long)initialCapacity / loadFactor)大小的Map

    (三) 主要的内部静态类

    Node类

    1、属性:hash(每一个元素的标识符,在1.7中用来定位segment的位置和数组位置,1.8中用来定位数组位置)、key、value、next 2、 1、需要提到的是find()方法,对map.get()提供虚拟支持,在Node类的子类中需要重写find()方法 2、不允许直接设置value值,会抛出异常

    Node<K,V> find(int h, Object k) { Node<K,V> e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } TreeNode类:构成二叉树的基本元素,重写了find()方法TreeBin类:红黑树的头部节点,选择了cas+读写锁的策略,能够使得在writer之前需要需要等待reader完成,使用TreeBin代替了TreeNode节点,实际上在table数组中,存放的是TreeBin对象ForwardingNode类:数组扩容时候数据转移的临时节点,一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。CounterCell 主要用来计数,首先整体的计数肯定通过cas操作baseCount变量,但是由于多线程操作,肯定会出现自选失败的情况,如果出现自旋失败,就需要去存储到对应的COunterCell数组中,在获取ConcurrenthashMap总个数的时候,只需要baseCount+CounterCell中的数量

    (三) 关键方法

    Native方法:保证了在多线程下的可见性和有序性,遵循内存屏障 //在tab中查找偏移量为i的Node节点 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) //通过cas设置偏移量为i的值,如果等于期待值就设置变量 static final <K,V> Boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) //通过cas设置偏移量为i的值 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) initTable():初始化数组 如果自旋设置sizeCtl =-1,表明当前线程正在初始化,初始化完成之后,sizeCtl 等于总容量的3/4,作为触发扩容的阈值 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); //有其他线程在初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//自旋,如果能够成功,则进入初始化 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//默认容量为16 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2);//容量为总容量的3/4 } } finally { sizeCtl = sc; } break; } } return tab; } get()方法:获取key值对应的value public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //计算hash,通过高16位与低16位异或进行扰乱,避免hash冲突的发生 int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) //链表头结点 return e.val; } else if (eh < 0) //特殊节点,通过find方法查找 /** static final int MOVED = -1; // hash for forwarding nodes(扩容时候使用) static final int TREEBIN = -2; // hash for roots of trees(转化为二叉树使用) static final int RESERVED = -3; // hash for transient reservations **/ return (p = e.find(h, key)) != null ? p.val : null; //链表查找 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }

    put()方法:与HashMap方法相同,调用putval添加值

    只需要理解每一个函数出现的作用,因为COncurrentHashMap涉及到扩容,因此在之后扩容部分,将详细讲解helpTransfer()、addCount()

    整个添加元素的过程 (1)首先获取元素hash值,如果数组没有初始化,需要去初始化数组,之后去添加元素 (2)空节点中添加了forward节点(虚拟表示:MOVED),说明有其他线程在扩容需要去协助扩容 (3)如果是插入到链表中(fh>0),插入首节点直接插入,插入节点key值相同,则覆盖,否则遍历链表,插入链表结尾 (4)如果是红黑树节点,插入到红黑树 (5)判断链表节点加入后是否超过阈值8,超过阈值转化为红黑树 (6)计数,在ConcurrentHashMap中元素个数

    final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //如果为null或者table数组未初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); //添加到链表头结点 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //有一个线程在扩容,需要当前线程协助扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; //锁定当前节点,添加元素 synchronized (f) { if (tabAt(tab, i) == f) { //表明是非树节点 if (fh >= 0) { binCount = 1; //遍历链表所有节点 for (Node<K,V> e = f;; ++binCount) { K ek; //key值一致,则节点覆盖 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //已经到链表尾部,添加到链表结尾 if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } //添加树节点 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //链表节点数目大于8,转化为红黑树 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //将当前ConcurrentHashMap的元素数量+1 addCount(1L, binCount); return null; } 扩容函数 1、 addCount()计数函数涉及到的扩容部分(为了理解transfer) private final void addCount(long x, int check) { //CounterCell实现单个线程的计数.....(省略了计数,后续补充) if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; //触发扩容 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //扩容标识符 int rs = resizeStamp(n); //sc<0,其他线程在扩容 if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //当前已经有其他线程在扩容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //唯一或者第一个扩容的线程 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }

    首先是resizeStamp(),该函数返回一个用于数据校验的标志位,意思是对长度为n的table进行扩容。它将n的前导零(最高有效位之前的零的数量)和1 << 15做或运算,这时低16位的最高位为1,其他都为n的前导零。

    static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); }

    rs << RESIZE_STAMP_SHIFT) + 2含义,首先RESIZE_STAMP_SHIFT = 32-RESIZE_STAMP_BITS(默认为16),rs是扩容的标识符,将该标识符左移16位,剩余的16位就表示扩容线程的数量,由于-1表示table数组初始化,因此会选择+2的方式,特别是该操作需要理解,在后边会使用。

    2、transfer() 单线程扩容 它的大体思想就是遍历、复制的过程。首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素: (1)如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点; (2)如果这个位置是Node节点(fh>=0),根据hash&n 判断高位,将该链表拆分为原位置链表和i+n(原始数组容量)的链表,放在对应位置 (3)如果这个位置是TreeBin节点(fh<0),根据hash&n 判断高位,将该红黑树拆分为原位置红黑树和i+n(原始数组容量)的红黑树,放在对应位置 (4)遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍,完成扩容。 多线程扩容 如果遍历到的节点是forward节点,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另一个线程看到forward,就向后遍历。这样交叉就完成了复制工作。而且还很好的解决了线程安全的问题。

    //一个过渡的table表 只有在扩容的时候才会使用 private transient volatile Node<K,V>[] nextTable; private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //确定好每一个线程所负责的桶范围 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; if (nextTab == null) { // 初始化桶,为原来容量的2倍 try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; //设置forwardingNode节点,用于标志位 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//在扩容是忽略,有这个标识的bucket标识已经有其他线程处理(将对应桶中的数据放置到新数组中) boolean advance = true;//并发扩容的关键属性 如果等于true 说明这个节点已经处理过 boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing)//当前线程已经分配了bucket区域,但是依旧存在未处理的桶 advance = false; else if ((nextIndex = transferIndex) <= 0) {//所有bucket已经分配完毕 i = -1; advance = false; } //为当前线程设置负责的bucket范围 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; //所有节点完成复制,则清空nextTable并将新数组赋值给原始数组 if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } //扩容线程数量-1,有很多线程,高16位标识扩容,低16位标识扩容线程数量 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //当前还有其他线程正在扩容,直接返回,如果==,就表示当前只有本线程扩容(参考addCout理解) if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //空节点直接设置为fwd节点 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // 其他线程已经处理当前bucket else { //锁定当前节点 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { //与HashMap扩容原理相同,产生原链表和反序链表 int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } //直接复制到原数组位置 if (runBit == 0) { ln = lastRun; hn = null; } else { //复制到i+原数组容量的位置 hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln);//添加到原数组 setTabAt(nextTab, i + n, hn);//添加到i+n的位置 setTabAt(tab, i, fwd); advance = true;//该bucket已经处理 } //树节点扩容同样思路,产生原始位置的红黑树与i+n位置的红黑树 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //转换后不满足阈值6,将二叉树转化为链表 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }

    3、tryPresize()扩容函数应用:再理解一下多线程扩容

    private final void tryPresize(int size) { //初始化容量 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; // sizeCtl是默认值或正整数 // 代表table还未初始化 // 或还没有其他线程正在进行扩容 while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; //初始化 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc;//容量 } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) break; //与addCount异曲同工,开始扩容 else if (tab == table) { int rs = resizeStamp(n); if (sc < 0) { Node<K,V>[] nt; //标识位不相等,扩容结束 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //增加一个线程扩容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //唯一或者第一个线程扩容 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } } 计数addCount():补充前边省略的部分 (1)更新baseCount值 (2)检查是否需要扩容,并扩容(前边已经说明) private final void addCount(long x, int check) { CounterCell[] as; long b, s; // cas更新baseCount失败,每个线程都会通过ThreadLocalRandom.getProbe() & m寻址找到属于它的CounterCell,然后进行计数。 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //调用fullAddCount(),该函数负责初始化CounterCells和更新计数 fullAddCount(x, uncontended); return; } if (check <= 1) return; //统计总数(baseCount+所有CounterCells数组个数) s = sumCount(); } //.....扩容省略 }

    (3)fullAddCount()初始化CounterCells和更新计数

    private final void fullAddCount(long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); //相当于找到每一个线程对应的Hash h = ThreadLocalRandom.getProbe(); wasUncontended = true;//未竞争标识 } boolean collide = false; // 冲突标识,True if last slot nonempty for (;;) { CounterCell[] as; CounterCell a; int n; long v;+ if ((as = counterCells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { //0和1,当它被设置为0时表示没有加锁,当它被设置为1时表示已被其他线程加锁 if (cellsBusy == 0) { // Try to attach new Cell //通过内部类,添加数量 CounterCell r = new CounterCell(x); //自旋加锁 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; //添加计数变量 try { // Recheck under lock CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0;//释放锁 } // 如果未成功 // 代表as[(n - 1) & h]这个位置的Cell已经被其他线程设置 // 那么就从循环头重新开始 if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // (a = as[(n - 1) & h]) != null 表示有其他线程竞争 wasUncontended = true; // 需要去重新计算hash else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))//成功则退出循环 break; else if (counterCells != as || n >= NCPU) // 达到了数组容量最大值,无法扩容 collide = false; else if (!collide) collide = true; else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {//扩容为原来的2倍,并数据迁移到新数组 CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = ThreadLocalRandom.advanceProbe(h);//当前线程重新计算hash } //表明数组未初始化 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // 初始化counterCells 数组 if (counterCells == as) { CounterCell[] rs = new CounterCell[2];//初始化容量为2 rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } // 初始化CounterCell数组成功,退出循环 if (init) break; } else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; //失败则使用baseCount } }

    四、引用

    【1】 jdk1.7 segment结构图:https://sylvanassun.github.io/2018/03/16/2018-03-16-map_family/#more 【2】参考主力:https://blog.csdn.net/yjp198713/article/details/79004798#forwardingnode 【3】参考主力:https://blog.csdn.net/qq_30336433/article/details/82586416#扩容

    五、结语

    看Concurrenthashmap比较吃力,主要是分析了1.8中的特性,对于1.7中的特性只是提到了一小块,其中还有很多部分需要日后继续补充。

    最新回复(0)