ConcurrentHashMap之实现细节

    xiaoxiao2026-01-06  10

    ConcurrentHashMap 的实现原理:

    锁分离(Lock striping)

    ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的hash table,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。

    有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这里“按顺序”是很重要的,否则极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。不变性是多线程编程占有很重要的地位,下面还要谈到。

    Java代码   /**   * The segments, each of which is a specialized hash table   */   final Segment<K,V>[] segments;  

    ConcurrentHashMap初始化方法是通过initialCapacity,loadFactor, concurrencyLevel几个参数来初始化segments数组,段偏移量segmentShift,段掩码segmentMask和每个segment里的HashEntry数组。

    初始化segments数组。让我们来看一下初始化segmentShift,segmentMask和segments数组的源代码。

    查看源代码 打印 帮助 01 if (concurrencyLevel > MAX_SEGMENTS) 02   03 concurrencyLevel = MAX_SEGMENTS; 04   05 // Find power-of-two sizes best matching arguments 06   07 int sshift = 0; 08   09 int ssize = 1; 10   11 while (ssize < concurrencyLevel) { 12   13 ++sshift; 14   15 ssize <<= 1; 16   17 } 18   19 segmentShift = 32 - sshift; 20   21 segmentMask = ssize - 1; 22   23 this.segments = Segment.newArray(ssize);

    CurrentHashMap的初始化一共有三个参数,一个initialCapacity,表示初始的容量,一个loadFactor,表示负载参数,最后一个是concurrentLevel,代表ConcurrentHashMap内部的Segment的数量,ConcurrentLevel一经指定,不可改变,后续如果ConcurrentHashMap的元素数量增加导致ConrruentHashMap需要扩容,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash就可以了。

    整个ConcurrentHashMap的初始化方法还是非常简单的,先是根据concurrentLevel来new出Segment,这里Segment的数量是不大于concurrentLevel的最大的2的指数,就是说Segment的数量永远是2的指数个,这样的好处是方便采用移位操作来进行hash,加快hash的过程。接下来就是根据intialCapacity确定Segment的容量的大小,每一个Segment的容量大小也是2的指数,同样使为了加快hash的过程。

    这边需要特别注意一下两个变量,分别是segmentShift和segmentMask,这两个变量在后面将会起到很大的作用,假设构造函数确定了Segment的数量是2的n次方,那么segmentShift就等于32减去n,而segmentMask就等于2的n次方减一。

    不变(Immutable)和易变(Volatile) ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。如果使用传统的技术,如HashMap中的实现,如果允许可以在hash链的中间添加或删除元素,读操作不加锁将得到不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry代表每个hash链中的一个节点,其结构如下所示:

    static final class HashEntry<K,V> { 2. final K key; 3. final int hash; 4. volatile V value; 5. final HashEntry<K,V> next; 6. } static final class HashEntry<K,V> { 2. final K key; 3. final int hash; 4. volatile V value; 5. final HashEntry<K,V> next; 6. } 可以看到除了value不是final的,其它值都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,因为这需要修改next 引用值,所有的节点的修改只能从头部开始。对于put操作,可以一律添加到Hash链的头部。但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。这在讲解删除操作时还会详述。为了确保读操作能够看到最新的值,将value设置成volatile,这避免了加锁。

    ConcurrentHashMap的get操作

    前面提到过ConcurrentHashMap的get操作是不用加锁的,我们这里看一下其实现:

     

    Java代码   public V get(Object key) {       int hash = hash(key.hashCode());       return segmentFor(hash).get(key, hash);   }    

    看第三行,segmentFor这个函数用于确定操作应该在哪一个segment中进行,几乎对ConcurrentHashMap的所有操作都需要用到这个函数,我们看下这个函数的实现:

    Java代码   final Segment<K,V> segmentFor(int hash) {       return segments[(hash >>> segmentShift) & segmentMask];   }  

    这个函数用了位操作来确定Segment,根据传入的hash值向右无符号右移segmentShift位,然后和segmentMask进行与操作,结合我们之前说的segmentShift和segmentMask的值,就可以得出以下结论:假设Segment的数量是2的n次方,根据元素的hash值的高n位就可以确定元素到底在哪一个Segment中。

    在确定了需要在哪一个segment中进行操作以后,接下来的事情就是调用对应的Segment的get方法:

     

    Java代码   V get(Object key, int hash) {       if (count != 0) { // read-volatile           HashEntry<K,V> e = getFirst(hash);           while (e != null) {               if (e.hash == hash && key.equals(e.key)) {                   V v = e.value;                   if (v != null)                       return v;                   return readValueUnderLock(e); // recheck               }               e = e.next;           }       }       return null;   }    

    先看第二行代码,这里对count进行了一次判断,其中count表示Segment中元素的数量,我们可以来看一下count的定义:

    Java代码   transient volatile int count;    

    可以看到count是volatile的,实际上这里里面利用了volatile的语义:

     

     写道 对volatile字段的写入操作happens-before于每一个后续的同一个字段的读操作。

    因为实际上put、remove等操作也会更新count的值,所以当竞争发生的时候,volatile的语义可以保证写操作在读操作之前,也就保证了写操作对后续的读操作都是可见的,这样后面get的后续操作就可以拿到完整的元素内容。

    然后,在第三行,调用了getFirst()来取得链表的头部:

     

    Java代码   HashEntry<K,V> getFirst(int hash) {       HashEntry<K,V>[] tab = table;       return tab[hash & (tab.length - 1)];   }    

    同样,这里也是用位操作来确定链表的头部,hash值和HashTable的长度减一做与操作,最后的结果就是hash值的低n位,其中n是HashTable的长度以2为底的结果。

    在确定了链表的头部以后,就可以对整个链表进行遍历,看第4行,取出key对应的value的值,如果拿出的value的值是null,则可能这个key,value对正在put的过程中,如果出现这种情况,那么就加锁来保证取出的value是完整的,如果不是null,则直接返回value。

    ConcurrentHashMap的put操作

    看完了get操作,再看下put操作,put操作的前面也是确定Segment的过程,这里不再赘述,直接看关键的segment的put方法:

     

    Java代码   V put(K key, int hash, V value, boolean onlyIfAbsent) {       lock();       try {           int c = count;           if (c++ > threshold) // ensure capacity               rehash();           HashEntry<K,V>[] tab = table;           int index = hash & (tab.length - 1);           HashEntry<K,V> first = tab[index];           HashEntry<K,V> e = first;           while (e != null && (e.hash != hash || !key.equals(e.key)))               e = e.next;               V oldValue;           if (e != null) {               oldValue = e.value;               if (!onlyIfAbsent)                   e.value = value;           }           else {               oldValue = null;               ++modCount;               tab[index] = new HashEntry<K,V>(key, hash, first, value);               count = c; // write-volatile           }           return oldValue;       } finally {           unlock();       }   }    

    首先对Segment的put操作是加锁完成的,然后在第五行,如果Segment中元素的数量超过了阈值(由构造函数中的loadFactor算出)这需要进行对Segment扩容,并且要进行rehash,关于rehash的过程大家可以自己去了解,这里不详细讲了。

    第8和第9行的操作就是getFirst的过程,确定链表头部的位置。

    第11行这里的这个while循环是在链表中寻找和要put的元素相同key的元素,如果找到,就直接更新更新key的value,如果没有找到,则进入21行这里,生成一个新的HashEntry并且把它加到整个Segment的头部,然后再更新count的值。

    ConcurrentHashMap的remove操作

    Remove操作的前面一部分和前面的get和put操作一样,都是定位Segment的过程,然后再调用Segment的remove方法:

     

    Java代码   V remove(Object key, int hash, Object value) {       lock();       try {           int c = count - 1;           HashEntry<K,V>[] tab = table;           int index = hash & (tab.length - 1);           HashEntry<K,V> first = tab[index];           HashEntry<K,V> e = first;           while (e != null && (e.hash != hash || !key.equals(e.key)))               e = e.next;               V oldValue = null;           if (e != null) {               V v = e.value;               if (value == null || value.equals(v)) {                   oldValue = v;                   // All entries following removed node can stay                   // in list, but all preceding ones need to be                   // cloned.                   ++modCount;                   HashEntry<K,V> newFirst = e.next;                   for (HashEntry<K,V> p = first; p != e; p = p.next)                       newFirst = new HashEntry<K,V>(p.key, p.hash,                                                     newFirst, p.value);                   tab[index] = newFirst;                   count = c; // write-volatile               }           }           return oldValue;       } finally {           unlock();       }   }    

    首先remove操作也是确定需要删除的元素的位置,不过这里删除元素的方法不是简单地把待删除元素的前面的一个元素的next指向后面一个就完事了,我们之前已经说过HashEntry中的next是final的,一经赋值以后就不可修改,在定位到待删除元素的位置以后,程序就将待删除元素前面的那一些元素全部复制一遍,然后再一个一个重新接到链表上去,看一下下面这一幅图来了解这个过程:

    假设链表中原来的元素如上图所示,现在要删除元素3,那么删除元素3以后的链表就如下图所示:

    首先看下get操作,同样ConcurrentHashMap的get操作是直接委托给Segment的get方法,直接看Segment的get方法:

     

    Java代码   V get(Object key, int hash) {       if (count != 0) { // read-volatile           HashEntry<K,V> e = getFirst(hash);           while (e != null) {               if (e.hash == hash && key.equals(e.key)) {                   V v = e.value;                   if (v != null)                       return v;                   return readValueUnderLock(e); // recheck               }               e = e.next;           }       }       return null;   }  

     

    get操作不需要锁。第一步是访问count变量,这是一个volatile变量,由于所有的修改操作在进行结构修改时都会在最后一步写count变量,通过这种机制保证get操作能够得到几乎最新的结构更新。对于非结构更新,也就是结点值的改变,由于HashEntry的value变量是volatile的,也能保证读取到最新的值。接下来就是对hash链进行遍历找到要获取的结点,如果没有找到,直接访回null。对hash链进行遍历不需要加锁的原因在于链指针next是final的。但是头指针却不是final的,这是通过getFirst(hash)方法返回,也就是存在table数组中的值。这使得getFirst(hash)可能返回过时的头结点,例如,当执行get方法时,刚执行完getFirst(hash)之后,另一个线程执行了删除操作并更新头结点,这就导致get方法中返回的头结点不是最新的。这是可以允许,通过对count变量的协调机制,get能读取到几乎最新的数据,虽然可能不是最新的。要得到最新的数据,只有采用完全的同步。

     

    最后,如果找到了所求的结点,判断它的值如果非空就直接返回,否则在有锁的状态下再读一次。这似乎有些费解,理论上结点的值不可能为空,这是因为put的时候就进行了判断,如果为空就要抛NullPointerException。空值的唯一源头就是HashEntry中的默认值,因为HashEntry中的value不是final的,非同步读取有可能读取到空值。仔细看下put操作的语句:tab[index] = new HashEntry<K,V>(key, hash, first, value),在这条语句中,HashEntry构造函数中对value的赋值以及对tab[index]的赋值可能被重新排序,这就可能导致结点的值为空。这种情况应当很罕见,一旦发生这种情况,ConcurrentHashMap采取的方式是在持有锁的情况下再读一遍,这能够保证读到最新的值,并且一定不会为空值。

     

    Java代码   V readValueUnderLock(HashEntry<K,V> e) {       lock();       try {           return e.value;       } finally {           unlock();       }   }    

     

    另一个操作是containsKey,这个实现就要简单得多了,因为它不需要读取值:

    Java代码   boolean containsKey(Object key, int hash) {       if (count != 0) { // read-volatile           HashEntry<K,V> e = getFirst(hash);           while (e != null) {               if (e.hash == hash && key.equals(e.key))                   return true;               e = e.next;           }       }       return false;   }  

    有些操作需要涉及到多个段,比如说size(), containsValaue()。先来看下size()方法:

     

    Java代码   public int size() {       final Segment<K,V>[] segments = this.segments;       long sum = 0;       long check = 0;       int[] mc = new int[segments.length];       // Try a few times to get accurate count. On failure due to       // continuous async changes in table, resort to locking.       for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {           check = 0;           sum = 0;           int mcsum = 0;           for (int i = 0; i < segments.length; ++i) {               sum += segments[i].count;               mcsum += mc[i] = segments[i].modCount;           }           if (mcsum != 0) {               for (int i = 0; i < segments.length; ++i) {                   check += segments[i].count;                   if (mc[i] != segments[i].modCount) {                       check = -1// force retry                       break;                   }               }           }           if (check == sum)               break;       }       if (check != sum) { // Resort to locking all segments           sum = 0;           for (int i = 0; i < segments.length; ++i)               segments[i].lock();           for (int i = 0; i < segments.length; ++i)               sum += segments[i].count;           for (int i = 0; i < segments.length; ++i)               segments[i].unlock();       }       if (sum > Integer.MAX_VALUE)           return Integer.MAX_VALUE;       else           return (int)sum;   }  

     

    size方法主要思路是先在没有锁的情况下对所有段大小求和,如果不能成功(这是因为遍历过程中可能有其它线程正在对已经遍历过的段进行结构性更新),最多执行RETRIES_BEFORE_LOCK次,如果还不成功就在持有所有段锁的情况下再对所有段大小求和。在没有锁的情况下主要是利用Segment中的modCount进行检测,在遍历过程中保存每个Segment的modCount,遍历完成之后再检测每个Segment的modCount有没有改变,如果有改变表示有其它线程正在对Segment进行结构性并发更新,需要重新计算。

     

     

    其实这种方式是存在问题的,在第一个内层for循环中,在这两条语句sum += segments[i].count; mcsum += mc[i] = segments[i].modCount;之间,其它线程可能正在对Segment进行结构性的修改,导致segments[i].count和segments[i].modCount读取的数据并不一致。这可能使size()方法返回任何时候都不曾存在的大小,很奇怪javadoc居然没有明确标出这一点,可能是因为这个时间窗口太小了吧。size()的实现还有一点需要注意,必须要先segments[i].count,才能segments[i].modCount,这是因为segment[i].count是对volatile变量的访问,接下来segments[i].modCount才能得到几乎最新的值(前面我已经说了为什么只是“几乎”了)。这点在containsValue方法中得到了淋漓尽致的展现:

     

     

    Java代码   public boolean containsValue(Object value) {       if (value == null)           throw new NullPointerException();          // See explanation of modCount use above          final Segment<K,V>[] segments = this.segments;       int[] mc = new int[segments.length];          // Try a few times without locking       for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {           int sum = 0;           int mcsum = 0;           for (int i = 0; i < segments.length; ++i) {               int c = segments[i].count;               mcsum += mc[i] = segments[i].modCount;               if (segments[i].containsValue(value))                   return true;           }           boolean cleanSweep = true;           if (mcsum != 0) {               for (int i = 0; i < segments.length; ++i) {                   int c = segments[i].count;                   if (mc[i] != segments[i].modCount) {                       cleanSweep = false;                       break;                   }               }           }           if (cleanSweep)               return false;       }       // Resort to locking all segments       for (int i = 0; i < segments.length; ++i)           segments[i].lock();       boolean found = false;       try {           for (int i = 0; i < segments.length; ++i) {               if (segments[i].containsValue(value)) {                   found = true;                   break;               }           }       } finally {           for (int i = 0; i < segments.length; ++i)               segments[i].unlock();       }       return found;   }    

    同样注意内层的第一个for循环,里面有语句int c = segments[i].count; 但是c却从来没有被使用过,即使如此,编译器也不能做优化将这条语句去掉,因为存在对volatile变量count的读取,这条语句存在的唯一目的就是保证segments[i].modCount读取到几乎最新的值。关于containsValue方法的其它部分就不分析了,它和size方法差不多。

     

     

    跨段方法中还有一个isEmpty()方法,其实现比size()方法还要简单,也不介绍了。最后简单地介绍下迭代方法,如keySet(), values(), entrySet()方法,这些方法都返回相应的迭代器,所有迭代器都继承于Hash_Iterator类(提交时居然提醒我不能包含sh It,只得加了下划线),里实现了主要的方法。其结构是:

     

    Java代码   abstract class Hash_Iterator{       int nextSegmentIndex;       int nextTableIndex;       HashEntry<K,V>[] currentTable;       HashEntry<K, V> nextEntry;       HashEntry<K, V> lastReturned;   }  

     

     nextSegmentIndex是段的索引,nextTableIndex是nextSegmentIndex对应段中中hash链的索引,currentTable是nextSegmentIndex对应段的table。调用next方法时主要是调用了advance方法:

     

     

    Java代码   final void advance() {       if (nextEntry != null && (nextEntry = nextEntry.next) != null)           return;          while (nextTableIndex >= 0) {           if ( (nextEntry = currentTable[nextTableIndex--]) != null)               return;       }          while (nextSegmentIndex >= 0) {           Segment<K,V> seg = segments[nextSegmentIndex--];           if (seg.count != 0) {               currentTable = seg.table;               for (int j = currentTable.length - 1; j >= 0; --j) {                   if ( (nextEntry = currentTable[j]) != null) {                       nextTableIndex = j - 1;                       return;                   }               }           }       }   }  

    不想再多介绍了,唯一需要注意的是跳到下一个段时,一定要先读取下一个段的count变量。 

     

    这种迭代方式的主要效果是不会抛出ConcurrentModificationException。一旦获取到下一个段的table,也就意味着这个段的头结点在迭代过程中就确定了,在迭代过程中就不能反映对这个段节点并发的删除和添加,对于节点的更新是能够反映的,因为节点的值是一个volatile变量。

          关于    putIfAbsent()函数的一些理解 :

       该函数的最直接的意思是如果查询的结果不存在,那么久添加进去,否则就不添加,在高并发上能够有效的防止被重复添加。

          V putIfAbsent(K key, V value)

                 If the specified key is not already associated with a value,associate it with the given value. This is equivalent to if (!map.containsKey(key))      return map.put(key, value);    else     return map.get(key); except that the action is performed atomically.   这是官方的说明,相当于   if (!map.containsKey(key)) return map.put(key, value); else return map.get(key); public static Pattern getPattern(String pattern) { Pattern ret = compliedPattern.get(pattern); if (ret == null) { Pattern newPattern = Pattern.compile(pattern); ret = compliedPattern.putIfAbsent(pattern, Pattern.compile(pattern)); if (ret == null) { ret = newPattern; } } return ret; }

    最新回复(0)