java Map

    xiaoxiao2022-07-05  167

    HashMap

    JDK1.7 HashMap

    1.7 数组+链表方式存储数据,数组是Entry对象组成的数组,对key进行hash,hash后的结果作为数组下标,如果不同key的hash结果相同,就将Entry数据组成链表存放在对应的数组下

    变量

    /** * 默认容器容量 */ static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16 /** * 容器最大容量 */ static final int MAXIMUM_CAPACITY = 1 << 30; /** * 加载因子 */ static final float DEFAULT_LOAD_FACTOR = 0.75f; /** * 数组,桶 */ static final Entry<?,?>[] EMPTY_TABLE = {}; /** * 数组,根据需要调整。长度必须是2的幂。 */ transient Entry<K,V>[] table = (Entry<K,V>[]) EMPTY_TABLE; /** * map的长度 */ transient int size; /** * 桶大小,可在初始化时显式指定。调整扩展容量大小的话,是当前容量*0.75 * The next size value at which to resize (capacity * load factor). */ int threshold; /** * The load factor for the hash table. * 负载因子,可在初始化时显式指定,默认0.75 */ final float loadFactor; /** * HashMap fail-fast,循环HashMap,在循环里面添加、修改或删除key的话,会报错 */ transient int modCount;

    初始化

    public HashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR); } public HashMap(int initialCapacity, float loadFactor) { if (initialCapacity < 0) throw new IllegalArgumentException("Illegal initial capacity: " + initialCapacity); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; if (loadFactor <= 0 || Float.isNaN(loadFactor)) throw new IllegalArgumentException("Illegal load factor: " + loadFactor); this.loadFactor = loadFactor; threshold = initialCapacity; init(); }

    无参数初始化HashMap的话,使用默认的容器大小 threshold = 16,默认加载因子 loadFactor=0.75,当数据大小达到了 16 * 0.75 = 12 时,就会对当前16的容量进行扩容,而扩容要涉及到 rehash、数据复制等操作,所以非常消耗性能,最好能预知map大小,避免扩容。

    put方法

    public V put(K key, V value) { if (table == EMPTY_TABLE) { inflateTable(threshold); } if (key == null) return putForNullKey(value); int hash = hash(key); int i = indexFor(hash, table.length); for (Entry<K,V> e = table[i]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) { V oldValue = e.value; e.value = value; e.recordAccess(this); return oldValue; } } modCount++; addEntry(hash, key, value, i); return null; } 如果table是null,table数组懒加载,创建了map的时候并没有创建table如果key是null,保存key为null的数据到table给key做hash,根据hash确定数组下标for循环table的下标为止数据,如果存在多个数据的话说明hash碰撞,判断当前的key在链表中是否有相同的,如果有更新value,没有的话,就拉到跳出循环modCount自增addEntry添加数据到Entry数组 /** * Inflates the table. */ private void inflateTable(int toSize) { // Find a power of 2 >= toSize int capacity = roundUpToPowerOf2(toSize); threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1); table = new Entry[capacity]; initHashSeedAsNeeded(capacity); } private static int roundUpToPowerOf2(int number) { // assert number >= 0 : "number must be non-negative"; return number >= MAXIMUM_CAPACITY ? MAXIMUM_CAPACITY : (number > 1) ? Integer.highestOneBit((number - 1) << 1) : 1; } final boolean initHashSeedAsNeeded(int capacity) { boolean currentAltHashing = hashSeed != 0; boolean useAltHashing = sun.misc.VM.isBooted() && (capacity >= Holder.ALTERNATIVE_HASHING_THRESHOLD); boolean switching = currentAltHashing ^ useAltHashing; if (switching) { hashSeed = useAltHashing ? sun.misc.Hashing.randomHashSeed(this) : 0; } return switching; } roundUpToPowerOf2 计算出大于toSize最临近的2的N此方的值,如果传入的容量是capacity=18那么计算出来的结果就是capacity=32,然后Math.min()得到最终的触发扩容的阀值threshold=24,new Entry大小就是32initHashSeedAsNeeded负责初始化 hashseed 的种子 final int hash(Object k) { int h = hashSeed; if (0 != h && k instanceof String) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); // This function ensures that hashCodes that differ only by // constant multiples at each bit position have a bounded // number of collisions (approximately 8 at default load factor). h ^= (h >>> 20) ^ (h >>> 12); return h ^ (h >>> 7) ^ (h >>> 4); }

    可以看到只有key为字符类型,而且hashSeed不为0时才采用新的哈希算法;sun.misc.Hashing.stringHash32实际上调用的是String.hash32方法:

    void addEntry(int hash, K key, V value, int bucketIndex) { if ((size >= threshold) && (null != table[bucketIndex])) { resize(2 * table.length); hash = (null != key) ? hash(key) : 0; bucketIndex = indexFor(hash, table.length); } createEntry(hash, key, value, bucketIndex); } void resize(int newCapacity) { Entry[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity == MAXIMUM_CAPACITY) { threshold = Integer.MAX_VALUE; return; } Entry[] newTable = new Entry[newCapacity]; transfer(newTable, initHashSeedAsNeeded(newCapacity)); table = newTable; threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1); } void createEntry(int hash, K key, V value, int bucketIndex) { Entry<K,V> e = table[bucketIndex]; table[bucketIndex] = new Entry<>(hash, key, value, e); size++; } 判断数据长度如果大于等于阀值,并且当前要插入所在数组下标的数据不是空。那就需要扩容数组容量,resize数组容量为当前的2倍,具体在方法里创建了新数组。重新计算各个key的hash值,因为可能hashseed种子已经有变化了,保存到新数组。重新计算新的扩容阀值扩容之后createEntry,首先拿到对应下标的数据,如果有的话 创建新的Entry对象放到链表的首位,之前的值放到新entry的next位置。

    get 方法

    public V get(Object key) { if (key == null) return getForNullKey(); Entry<K,V> entry = getEntry(key); return null == entry ? null : entry.getValue(); } getForNullKey,因为null的key都保存到了数组下标0的位置上,这里直接去下标0位置上的链表,如果有值就找到null的value,返回key不是null,就getEntry(Object key),  hash(key),计算下标,获取对应下标在数组上的链表,equals,key得到Entry对象返回。

    JDK1.8 HashMap

    1.8的map和1.7的实现上有很多的不同,同样是数组+链表的形式存储数据,只不过链表数据达到阀值之后,会转换成红黑树。在1.7中如果所有的记录都在一个桶上,那么平均查询一条记录要遍历一半的链表,时间复杂度是O(n)。1.8中的时间复杂度则是O(logn)。

    基本参数

    /** * 默认的容量16 */ static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16 /** * 最大容量 */ static final int MAXIMUM_CAPACITY = 1 << 30; /** * 默认加载因子 */ static final float DEFAULT_LOAD_FACTOR = 0.75f; /** * 链表转红黑树的阀值,大于8 */ static final int TREEIFY_THRESHOLD = 8; /** * 红黑树转链表的阀值,=6个节点时转链表 */ static final int UNTREEIFY_THRESHOLD = 6; /** * 转红黑树时,table的最小长度 */ static final int MIN_TREEIFY_CAPACITY = 64;

    hash定位

    static final int hash(Object key) { int h; return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); } int n = tab.length int index = tab[(n - 1) & hash]

    不管是增加,删除,还是查找,定位到哈希桶的位置都是至关重要的,HashMap是数组+链表+红黑树的组合,提高效率的方式就是尽量哈希的均匀。

    key是null,直接就放到了数组第一个位置上,不为空的话,拿到key的hashCode,将hashCode的高16位参与运算  >>>(无符号右移)  无符号右移,忽略符号位,空位都以0补齐.

    计算HashCode的话,沿用传入key参数的hashCode方法

    之后用数组长度减一与key的hash后的值,,得到最终的数组index

    例如:字符串A作为key的话String.hashCode后的结果是65^(65>>>16)=65

    然后 16(默认table长度)-1 & 65,得到的结果就是1 

    15的二进制是 0000 1111

    65的二进制是 0100 0001

    与之后的结果就是 0000 0001,就等于 1

     

    put方法

    final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { Node<K,V>[] tab; Node<K,V> p; int n, i; //table为空,则创建table if ((tab = table) == null || (n = tab.length) == 0) n = (tab = resize()).length; //创建完table之后,判断指定index位置是否是null,并赋值给读对象 p if ((p = tab[i = (n - 1) & hash]) == null) //如果是null,创建Node对象,放入指定index位置 tab[i] = newNode(hash, key, value, null); else { //如果不是null Node<K,V> e; K k; //这里判断p的数据是否和传递进来的key一致,赋值给e if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) e = p; //是否是红黑树node, else if (p instanceof TreeNode) e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); else { //不是已经存在的key,也不是红黑树节点,正常添加链表 //binCount用于计数,统计节点数 for (int binCount = 0; ; ++binCount) { //p.next==null,说明循环到了链表的最后一个node上 if ((e = p.next) == null) { //创建node,当前节点的next指向新node p.next = newNode(hash, key, value, null); if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st //链表超过了红黑树阀值,将链表转换成为红黑树 treeifyBin(tab, hash); break; } //这里是p.next!=null,同上面的判断一致,判断key和循环到的e一不一样,一样的话跳出循环 //会进入下面的e!=null的判断里面更新value去 //这里的这个判断,个人感觉并发情况下的话,可能会出现2个线程数据都进入到for循环, //第一个线程执行时next==null,之后加入链表 //第二个线程同时也put了相同的key并且也在for里面的话next就!=Null了, //这个时候这个if就会判断,然后跳出去 if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) break; //让p指向下一个节点 p = e; } } //如果进入了这个if,既说明key是相等的,就是已经存在的key,这里更新value值,返回旧结果值 if (e != null) { // existing mapping for key V oldValue = e.value; if (!onlyIfAbsent || oldValue == null) e.value = value; //linkedHashMap使用 afterNodeAccess(e); return oldValue; } } ++modCount; //超过数组容量阀值,进行扩容 if (++size > threshold) resize(); //linkedHashMap使用 afterNodeInsertion(evict); return null; }

    get方法

    final HashMap.Node<K,V> getNode(int hash, Object key) { //桶 HashMap.Node<K,V>[] tab; //首节点first,当前节点 e HashMap.Node<K,V> first, e; //桶长度,key int n; K k; //桶不是空,并且桶的长度大于0 并且首节点不是null if ((tab = table) != null && (n = tab.length) > 0 && (first = tab[(n - 1) & hash]) != null) { //首节点就是当前要获取的节点,直接返回首节点 if (first.hash == hash && // always check first node ((k = first.key) == key || (key != null && key.equals(k)))) return first; if ((e = first.next) != null) { //首节点是红黑树 if (first instanceof HashMap.TreeNode) //红黑树查找 TreeNode return ((HashMap.TreeNode<K,V>)first).getTreeNode(hash, key); do { //不是红黑树,并且首节点不是要找的节点,循环first.next if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } while ((e = e.next) != null); } } return null; }

    getTreeNode(hash, key)方法

    /** * Calls find for root node. */ final TreeNode<K,V> getTreeNode(int h, Object k) { //找到根节点,调用根节点的find方法 return ((parent != null) ? root() : this).find(h, k, null); } /** * Returns root of tree containing this node. */ final TreeNode<K,V> root() { //找根节点,循环遍历当前的treeNode,什么时候parent=null,说明代了树根节点 for (TreeNode<K,V> r = this, p;;) { if ((p = r.parent) == null) return r; r = p; } } /** * Finds the node starting at root p with the given hash and key. * The kc argument caches comparableClassFor(key) upon first use * comparing keys. * 此处是红黑树的遍历, 红黑树是特殊的自平衡二叉查找树 * 平衡二叉查找树的特点:左节点<根节点<右节点 */ final TreeNode<K,V> find(int h, Object k, Class<?> kc) { //当前红黑树对象 TreeNode<K,V> p = this; do { //第一次循环从根开始, int ph, dir; K pk; TreeNode<K,V> pl = p.left, pr = p.right, q; //当前树节点的hash值大于传入的hash,说明h在当前节点的左边 if ((ph = p.hash) > h) p = pl;//设置p为左边树 else if (ph < h) //如果小于了,说明h在当前节点的右边 p = pr; //设置p为右边树 //本次循环的节点key等于传入的key,则p就是目标节点,直接人 return else if ((pk = p.key) == k || (k != null && k.equals(pk))) return p; //代码能走到这里,说明,hash值相同,但是key不同 else if (pl == null)//p节点的左边树空,设置p为右边树 p = pr; //代码能走到这里,说明,hash值相同,但是key不同,pl还不等于null else if (pr == null)//p节点的右边树空,设置p为左边树 p = pl; //代码能走到这里,说明,hash值相同,但是key不同,pl。pr都不是null else if ((kc != null || //kc首次进来的时候一定是null。就会调用comparableClassFor() //comparableClassFor就是去反射这个传进来的key是string还是对象类型 //如果是String直接返回,如果是对象类型,拿到对象类型的所有接口,找到接口范型类是comparable的 //也就是范型<comparable>这种,并且范型还只有一个,不能有多个,不能是<k,v>这种 //最后返回这个类型, 也就是实现了comparable接口的类,否则是String,否则是null //compareComparables方法先判断kc和当前传入的k是不是同一个类型,是的话就直接排序, //string的话直接comparable,或者用其他对象类型自定义的comparable实现来排序 //并且返回的结果不相等 //这个对比是传入k和当前节点的pk排序,例如,k="A" ,pk="b",那么"A".compareTo("B")=-1, (kc = comparableClassFor(k)) != null) && (dir = compareComparables(kc, k, pk)) != 0) //小于0让左边树遍历,大于0往右边树遍历 p = (dir < 0) ? pl : pr; //代码能走到这里,说明,hash值相同,但是key不同,pl。pr都不是null,k的类型没有实现comparable //直接定向先往右边树递归遍历 else if ((q = pr.find(h, k, kc)) != null) return q; else //代码能走到这里,说明,hash值相同,但是key不同,pl。pr都不是null,k的类型没有实现comparable, //定向向右边树遍历结果为null,因此直接往左边遍历 p = pl; } while (p != null); return null; }

     ConcurrentHashMap

    为什么要使用ConcurrentHashMap

    线程不安全的HashMap

    在多线程的环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry

        2. 效率地下的HashTable

    HashTable的大部分方法都使用了synchronized来保证线程安全,所以多线程竞争下HashTable的效率非常低下。

        3. ConcurrentHashMap分段锁

    ConcurrentHashMap分段锁首先将数据分成一段一段地存储,然后给每一段数据配置一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

    ConcurrentHashMap的结构

    1.7 ConcurrentHashMap

    ConcurrentHashMap采用分段锁技术,只有同一个分段内的数据才存在竞争关系,操作的时候不需要锁定整个map。

    segment由数组组成,每一个segment下面由一个table数组,基于key的hash,确定table数组的位置,相同hash不同key的数据组成链表。

    基本参数

    /** * 默认的table数组容量 */ static final int DEFAULT_INITIAL_CAPACITY = 16; /** * 加载因子 */ static final float DEFAULT_LOAD_FACTOR = 0.75f; /** * 默认并发级别 */ static final int DEFAULT_CONCURRENCY_LEVEL = 16; /** * table数组最大容量 */ static final int MAXIMUM_CAPACITY = 1 << 30; /** * 最小的segment数组大小 */ static final int MIN_SEGMENT_TABLE_CAPACITY = 2; /** * 最大的segment */ static final int MAX_SEGMENTS = 1 << 16; // slightly conservative /** * 这是为了避免使用*无限重试,如果表进行不断的修改,这将使其无法获得准确的结果。 */ static final int RETRIES_BEFORE_LOCK = 2;

    ConcurrentHashMap初始化

    public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { //加载因子不大于0,默认hash桶数组大小小于0,并发深度小于0,报错 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //并发深度大于最大深度可; if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments // 找到最好的2的幂,意思就是如果传入的concurrencyLevel不是2的幂, // 那么就从1开始往左边位移,直到距离concurrencyLevel最近的2的幂 int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } //如果是默认值的话,经过上面的循环,sshift是4,而ssize是16 //那么segmentShift就是28 // segmentMask就是15 this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; //table数组大小 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] //创建Segment和Segment[]数组并初始化了Segment[]数组第0个元素 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }

    get方法

    public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); //传入的key做了hash之后,再次做偏移,找到segment long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { //如果找到了segment,并且segment对应的table也不是空 //循环查找key,找到了key返回value for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }

    put方法

    public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); //key计算hash int hash = hash(key); //计算在Segment中的位置 int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment //查找Segment的位置,未发现有数据,则创建, //因为在初始化map的时候只是创建了Segment[0] s = ensureSegment(j); return s.put(key, hash, value, false); } private Segment<K,V> ensureSegment(int k) { //拿到当前分段Segment[]数组 final Segment<K,V>[] ss = this.segments; //要创建的Segment的偏移量位置 long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { //如果对应位置上还是null,利用初始化时候的segment[0]的配置参数 Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); //创建Segment上的HashEntry数组 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; //重新检查 if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck //CAS创建Segment Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; } final V put(K key, int hash, V value, boolean onlyIfAbsent) { //创建完新的segment或者找到了segment,就会进入这个方法 //segment对象继承了ReentrantLock, //这里先是tryLock一次,如多拿到锁就往下走, //如果没拿到锁,执行scanAndLockForPut多次尝试tryLock, //并返回HashEntry HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; 这个hash是key的hash,现在计算key在HashEntry中的位置 int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); //循环定位HashEntry,存储数据 for (HashEntry<K,V> e = first;;) { //e不是空的,说明已经有数据存在,操作链表 if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { //node不是空,是scanAndLockForPut返回的, //然后将next设置为first,node在链表第一位 if (node != null) node.setNext(first); else //node是null,就创建一个 node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //是否扩容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else //存node数据 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }

    rehash()

    在put的时候,有可能会触发扩容

    private void rehash(HashEntry<K,V> node) { /* * */ //rehash发生在某个segment的table下 HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; //新的容量是旧容量的2倍 int newCapacity = oldCapacity << 1; //新的扩容阀值 threshold = (int)(newCapacity * loadFactor); //新的HashEntry数组,扩容后的 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; //循环旧的table for (int i = 0; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; //定位当前的HashEntry在新的HashEntry数组中的位置 int idx = e.hash & sizeMask; //HashEntry中只有一个 if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot //进入到这里说明链表有多个HashEntry组成 HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } //到这里是把链表的最后一个HashEntry放入新的数组,也就是最先进入链表的数据 newTable[lastIdx] = lastRun; // Clone remaining nodes //继续循环链表 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } //在存放要插入的node,新node的index int nodeIndex = node.hash & sizeMask; // add the new node //新node的next,指向当前HashEntry上对应位置的对象 node.setNext(newTable[nodeIndex]); //HashEntry数组对应位置指针指向当前node //例如,1,2,3,4分别put,那么从左到右依次是4,3,2,1 newTable[nodeIndex] = node; table = newTable; }

    1.8 ConcurrentHashMap

    ConcurrentHashMap在1.8中的实现,相比于1.7的版本基本上全部都变掉了。首先,取消了Segment分段锁的数据结构,取而代之的是数组+链表(红黑树)的结构。而对于锁的粒度,调整为对每个数组元素加锁(Node)。然后是定位节点的hash算法被简化了,Hash冲突会加剧。链表节点数量大于8时,会将链表转化为红黑树进行存储。查询的时间复杂度就会由原先的O(n)变为O(logN)。

     

     

    /** * table最大容量 */ private static final int MAXIMUM_CAPACITY = 1 << 30; /** * table默认容量 */ private static final int DEFAULT_CAPACITY = 16; /** * key转数组的时候最大长度,concurrentHashMap.keySet().toArray(); */ static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** * 表示默认的并发级别,也就是table[]的默认大小 */ private static final int DEFAULT_CONCURRENCY_LEVEL = 16; /** * 默认负载因子 */ private static final float LOAD_FACTOR = 0.75f; /** * 链表转红黑树的阀值,当table[i]下面的链表长度大于8时就转化为红黑树结构 */ static final int TREEIFY_THRESHOLD = 8; /** * 红黑树转链表的阀值,当链表长度<=6时转为链表(扩容时) */ static final int UNTREEIFY_THRESHOLD = 6;

    初始化

    private transient volatile int sizeCtl;

    sizeCtl用于table[]的初始化和扩容操作,不同值的代表状态如下:

    -1 :代表table正在初始化,其他线程应该交出CPU时间片 -N: 表示正有N-1个线程执行扩容操作(高 16 位是 length 生成的标识符,低 16 位是扩容的线程数) 大于 0: 如果table已经初始化,代表table容量,默认为table大小的0.75,如果还未初始化,代表需要初始化的大小

    public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); //初始化之后sizeCtl=table数组大小 this.sizeCtl = cap; }

    tableSizeFor,计算table数组大小

    int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); //这个方法为了计算出传入参数最临近的2的N次方的一个数 private static final int tableSizeFor(int c) { //例如传进来的是25,那么-1之后是24,目测距离最近的2的N次方的数是32 int n = c - 1; //将n>>>1的结果按位或赋值给n,第一次结束n>>>1=12,和n=24进行按位或=28 n |= n >>> 1; //将n>>>1的结果按位或赋值给n,第二次结束n>>>2=7,和n=28进行按位或=31 n |= n >>> 2; //将n>>>1的结果按位或赋值给n,第三次结束n>>>4=0,和n=31进行按位或=31 n |= n >>> 4; //将n>>>1的结果按位或赋值给n,第四次结束n>>>8=0,和n=31进行按位或=31 n |= n >>> 8; //将n>>>1的结果按位或赋值给n,第五次结束n>>>16=0,和n=31进行按位或=31 n |= n >>> 16; //最后n就是31,在这里对n进行了+1, n=32 return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }

     

    put

    public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); //hash int hash = spread(key.hashCode()); int binCount = 0; //死循环table数组 for (ConcurrentHashMap.Node<K,V>[] tab = table;;) { ConcurrentHashMap.Node<K,V> f; int n, i, fh; //table空,就创建 if (tab == null || (n = tab.length) == 0) tab = initTable(); //定位index,从table数组取出对应数据,如果是空,创建node对象,cas方式更新到对应index上 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new ConcurrentHashMap.Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } //table不是空,对应index位置数据不是null,并且对应节点上的hash值是 -1了,说明正在扩容 //那么就帮助去扩容 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { //table不是空,对应index位置数据不是null,并且对应节点上的hash值不是-1 V oldVal = null; //加锁 synchronized (f) { if (tabAt(tab, i) == f) { //f.hash if (fh >= 0) { binCount = 1; for (ConcurrentHashMap.Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } //找到e的next是null的,表示链表最后一条 //当前put的数据组成node,放到e的next上 ConcurrentHashMap.Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new ConcurrentHashMap.Node<K,V>(hash, key, value, null); break; } } } //如果是红黑树 else if (f instanceof ConcurrentHashMap.TreeBin) { ConcurrentHashMap.Node<K,V> p; binCount = 2; if ((p = ((ConcurrentHashMap.TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //看链表是否到了转红黑树的地步 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }

    帮助扩容 helpTransfer

    /** * Helps transfer if a resize is in progress. * 承接上一段put代码,走到这个方法说明当前要put的这个node在数组中有相同的hash值, * 并且正在扩容,当前线程帮助扩容操作 * tab node的数组 * f 当前要put的node * */ final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; //f instanceof ForwardingNode,这个就是可能因为有多线程同时put一个值, //例如第一个put进去了但是触发了扩容, //那么到第二个线程过来的时候这个f指向的node就可能是ForwardingNode类型 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }

     

    treeifyBin

    当链表长度大于等于阀值8的时候,要转成红黑树,具体方法是treeifyBin

    /** * Replaces all linked nodes in bin at given index unless table is * too small, in which case resizes instead. */ private final void treeifyBin(ConcurrentHashMap.Node<K,V>[] tab, int index) { ConcurrentHashMap.Node<K,V> b; int n, sc; if (tab != null) { //数组长度小于64,扩容到当前长度的2倍 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); //找到对应的数组下标记录,也就是桶 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { //上互斥锁 synchronized (b) { //double Check if (tabAt(tab, index) == b) { ConcurrentHashMap.TreeNode<K,V> hd = null, tl = null; //循环node,也就是链表 for (ConcurrentHashMap.Node<K,V> e = b; e != null; e = e.next) { ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V>(e.hash, e.key, e.val, null, null); //这里当前node没有前一个节点的话,他就是链表的第一个,那么就是红黑树的根 //hd=head, tl=tail if ((p.prev = tl) == null) hd = p; else tl.next = p;// //第一次循环的时候 hd=tl,都是p tl = p; } //最后node链表的数据会转成treeNode的链表,然后执行TreeBin setTabAt(tab, index, new ConcurrentHashMap.TreeBin<K,V>(hd)); } } } } }

    红黑树初始化TreeBin

    红黑树的要求:https://zh.wikipedia.org/wiki/红黑树

    节点是红色或黑色。根是黑色。所有叶子都是黑色(叶子是NIL节点)。每个红色节点必须有两个黑色的子节点。(从每个叶子到根的所有路径上不能有两个连续的红色节点。)从任一节点到其每个叶子的所有简单路径都包含相同数目的黑色节点。

     

     

    tryPresize

    /** * putAll和转红黑树的时候会进入这个方法,试着调整大小 */ private final void tryPresize(int size) { //新数组的大小 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; //sc = sizeCtl=扩容前的数组长度,或者是扩容的阀值 while ((sc = sizeCtl) >= 0) { ConcurrentHashMap.Node<K,V>[] tab = table; int n; //当前数组是空的,或者长度=0。为什么等于空,是调用了putAll方法创建的ConcurrentHashMap if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; //将SIZECTL的值CAS替换成-1。表示正在初始化,多线程只有一个线程更新了SIZECTL if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") //创建node数组 ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n]; table = nt; //这个计算类似N乘以了0。75,例如16>>>2=4, 16-4=12 ,16*0.75=12 sc = n - (n >>> 2); } } finally { //sizeCtl这里等于了扩容的阀值了 sizeCtl = sc; } } } //长度不合理就break else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { int rs = resizeStamp(n); //正在扩容 if (sc < 0) { ConcurrentHashMap.Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //sc < 0, 说明sizeCtl = -1 就是正在扩容 CAS更新SIZECTL if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //准备扩容 CAS更新SIZECTL = rs << 16 + 2 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }

     

    TreeBin

    /** * Creates bin with initial set of nodes headed by b. * 红黑树初始化 */ TreeBin(ConcurrentHashMap.TreeNode<K,V> b) { //node hash=-2 super(TREEBIN, null, null, null); this.first = b; ConcurrentHashMap.TreeNode<K,V> r = null; //循环treenode的节点链表 for (TreeNode<K,V> x = b, next; x != null; x = next) { //当前treeNode节点的下一个treeNode next = (TreeNode<K,V>)x.next; //设置左边子节点,右边子节点,null x.left = x.right = null; //treeNode的第一个节点,定义为红黑树的根,根绝红黑树定义,根必须是黑色的 if (r == null) { //当前根节点的父节点null x.parent = null; //当前节点不是红色 x.red = false; r = x; } else { //循环到这里的话,r=根节点,x是treenode的next节点 //key K k = x.key; //key的hash值 int h = x.hash; //排序 Class<?> kc = null; for (TreeNode<K,V> p = r;;) { int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) //进入到这里就是为了给key排序,k<=pk 返回-1,不然返回1,没结果返回0 dir = tieBreakOrder(k, pk); TreeNode<K,V> xp = p; //小于等于0,节点左边,,,大于0,节点右边 等于null, //说明当前节点的左边或者右边=null //说白了这个dir<=0, 放在xp的左边 // dir>0 , 放在xp的右边 if ((p = (dir <= 0) ? p.left : p.right) == null) { x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; //再平衡 r = balanceInsertion(r, x); break; } } } } //设置根节点 this.root = r; //检查红黑树 assert checkInvariants(root); }

    最新回复(0)