本文共 23964 字,大约阅读时间需要 79 分钟。
废话就不多说了,直入正题。
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable
// 默认初始化容量,这个容量指的是所有Segment中的hash桶的数量和static final int DEFAULT_INITIAL_CAPACITY = 16;// 每个Segment的默认的加载因子static final float DEFAULT_LOAD_FACTOR = 0.75f;// 整个Map的默认的并发级别,代表最大允许16个线程同时进行并发修改操作。实际并发级别要是 2^n 这种数,同时这个变量也是数组segments的长度static final int DEFAULT_CONCURRENCY_LEVEL = 16;// 每个Segment的最大hash桶的大小(数组Segment.table的最大长度),初始容量也不能大于此值static final int MAXIMUM_CAPACITY = 1 << 30;// 数组segments的最大长度,也是最大并发级别static final int MAX_SEGMENTS = 1 << 16; // slightly conservative,翻译过来是略保守,实际远远用不到这么多Segment啊// size方法和containValue方法最大的不加锁尝试次数// 简单点说就是先不加锁执行一次,如果没发现改变,就返回,发现改变就再重复一次,再发现改变就所有对segment都加锁再操作一次,这样设计主要是为了避免加锁提高效率。static final int RETRIES_BEFORE_LOCK = 2;
// 段掩码,跟子网掩码以及HashMap中的 table.length - 1 的作用差不多,都是用为了用位运算加速hash散列定位final int segmentMask;// 段偏移量,定位到一个segment使用的是hash值的高位,segments数组长度为2^n的话,此数值为32-n,也就是把最高的n位移动到最低的n位final int segmentShift;// 段数组final Segment三、基本类[] segments;// 下面都懂transient Set keySet;transient Set > entrySet;transient Collection values;
static final class HashEntry{ final K key; final int hash; volatile V value; final HashEntry next; HashEntry(K key, int hash, HashEntry next, V value) { this.key = key; this.hash = hash; this.next = next; this.value = value; } @SuppressWarnings("unchecked") static final HashEntry [] newArray(int i) { return new HashEntry[i]; }}
// Segment是一个特殊的hash表,写上继承ReentrantLock只是只是为了简化一些锁定,避免单独new一个ReentrantLock。static final class Segmentextends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; // 类似size,外部修改Segment结构的都会修改这个变量,它是volatile的,几个读方法在一开始就判断这个值是否为0,能尽量保证不做无用的读取操作。 transient volatile int count; // 跟集合类中的modCount一样,检测到这个数值变化说明Segment一定有被修改过 // 因为modCount不是volatile,有可能无法反映出一次修改操作的中间状态(历史的一致状态/未来的一致状态), // 这些状态本身在当时就不应该被看见,看见了也没事,所以使用普通变量也合理 transient int modCount; // 扩容阈值 transient int threshold; // 类似HashMap的table数组 transient volatile HashEntry [] table; // 加载因子,所有segment都是一样的,放在这里就不需要依赖外部类了 final float loadFactor; Segment(int initialCapacity, float lf) { loadFactor = lf; setTable(HashEntry. newArray(initialCapacity)); } @SuppressWarnings("unchecked") static final Segment [] newArray(int i) { return new Segment[i]; } // 只有持有本segment的锁或者是构造方法中才能调用这个方法,反序列化构造时也有用这个 void setTable(HashEntry [] newTable) { threshold = (int)(newTable.length * loadFactor); table = newTable; } // 跟indexFor算法一样 HashEntry getFirst(int hash) { HashEntry [] tab = table; return tab[hash & (tab.length - 1)]; } // 加锁读取value,在直接读取value得到null时调用 // 源码这里有英文注释:读到value为null,只有当某种重排序的HashEntry初始化代码让volatile变量初始化重排序到构造方法外面时才会出现, // 这一点旧的内存模型下是合法的,但是不知道会不会发生。 // 具体怎么发生,就是节点的构造方法执行完了,但是value的赋值重排序到构造方法外面,然后节点被引用了,挂载成了HashEntry链第一个节点, // 此时可以读取到这个节点,但是value的赋值不一定被执行了,value是默认值null // 搜下stackoverflow,说在新内存模型下已经不会发生了,是作者自己理解有些问题,后续的1.7也修复了 // http://stackoverflow.com/questions/5002428/concurrenthashmap-reorder-instruction/5144515#5144515 V readValueUnderLock(HashEntry e) { lock(); try { return e.value; } finally { unlock(); } } // 下面的方法是为了实现map中的方法,名字都很像 // 直接读value是不用加锁的,碰到读到value == null,才加锁再读一次,这个前面说了,后面的也一样 V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; } boolean containsKey(Object key, int hash) { if (count != 0) { // read-volatile HashEntry e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) // ConcurrentHashMap禁止添加null key,所以这里不用考虑key为null的情况,下同 return true; e = e.next; } } return false; } // 跟get差不多 boolean containsValue(Object value) { if (count != 0) { // read-volatile HashEntry [] tab = table; int len = tab.length; for (int i = 0 ; i < len; i++) { for (HashEntry e = tab[i]; e != null; e = e.next) { V v = e.value; if (v == null) // recheck v = readValueUnderLock(e); if (value.equals(v)) return true; } } } return false; } // 几个写操作,需要加锁,下同 boolean replace(K key, int hash, V oldValue, V newValue) { lock(); try { HashEntry e = getFirst(hash); while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; boolean replaced = false; if (e != null && oldValue.equals(e.value)) { // replace不会修改modCount,这降低了containValue方法的准确性,jdk1.7修复了这一点 replaced = true; e.value = newValue; } return replaced; } finally { unlock(); } } V replace(K key, int hash, V newValue) { lock(); try { HashEntry e = getFirst(hash); while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { // replace不会修改modCount,这降低了containValue方法的准确性,jdk1.7修复了这一点 oldValue = e.value; e.value = newValue; } return oldValue; } finally { unlock(); } } // 基本思路跟1.6的HashMap一样 V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { int c = count; // 先执行扩容,再添加节点,1.6的HashMap是先添加节点,再扩容 // 并且Segment这里是先用大于号判断大小,再count++,扩容时实际容量会比HashMap中同情况时多一个, // 会出现put完成后Segment.count > threshold应该扩容但是却没有扩容的情况,这不太符合设计 // eg:threshold = 12,那么在这个Segment上执行第13次put时,判断语句为 12++ > 12,为false,执行完成后Segment.count = 13,threshold = 12 // 另外默认构造情况下threshold = 0,而且每个Segment的table.length都为1,如果更改为符合设计的思路的扩容方式,第一次put又一定会触发扩容 // 1.7修改了上面两点,判断语句先让count加1,再用大于号执行判断,同时让table.length 最小值为2,这样第一次put也不会扩容,完善了整体的扩容机制 if (c++ > threshold) // ensure capacity rehash(); HashEntry [] tab = table; int index = hash & (tab.length - 1); // 定位方式和1.6的HashMap一样 HashEntry first = tab[index]; HashEntry e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { oldValue = e.value; if (!onlyIfAbsent) // put相同的key(相当于replace)不会修改modCount,这降低了containsValue方法的准确性,jdk1.7修复了这一点 e.value = value; } else { oldValue = null; ++modCount; // 也是添加在HashEntry链的头部,前面说了,这里的HashEntry的next指针是final的,new后就不能变 tab[index] = new HashEntry (key, hash, first, value); count = c; } return oldValue; } finally { unlock(); } } // 扩容时为了不影响正在进行的读线程,最好的方式是全部节点复制一次并重新添加 // 这里根据扩容时节点迁移的性质,最大可能的重用一部分节点,这个性质跟1.8的HashMap中的高低位是一个道理,必须要求hash值是final的 void rehash() { HashEntry [] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity >= MAXIMUM_CAPACITY) return; // 这里有段注释是说下面的算法及其作用的,跟1.8的HashMap的resize中用到的那个高低温的原理一样: // 扩容前在一个hash桶中的节点,扩容后只会有两个去向。这里是用 &,后续改为用高低位,实质上是一样的。 // 根据这个去向,找到最末尾的去向都一样的连续的一部分,这部分可以重用,不需要复制 // HashEntry的next是final的,resize/rehash时需要重新new,这里的特殊之处就是最大程度重用HashEntry链尾部的一部分,尽量减少重新new的次数 // 作者说从统计角度看,默认设置下有大约1/6的节点需要被重新复制一次,所以通常情况还是能节省不少时间的 HashEntry [] newTable = HashEntry.newArray(oldCapacity<<1); threshold = (int)(newTable.length * loadFactor); // 跟1.6的HashMap有一样的小问题,可能会过早变为Integer.MAX_VALUE从而导致后续永远不能扩容 int sizeMask = newTable.length - 1; for (int i = 0; i < oldCapacity ; i++) { // We need to guarantee that any existing reads of old Map can proceed. So we cannot yet null out each bin. // 为了保证其他线程能够继续执行读操作,不执行手动将原来table赋值为null,只是再最后修改一次table的引用 // 1.6的HashMap对应的地方有个src[j] = null,这句话在多线程时某些程度上能加快回收旧table数组,但是放在这里会影响其他线程的读操作 // 只要其他线程完成了读操作,就不会再引用旧HashEntry,旧的就会自动被垃圾回收器回收 // 下面有个关于resize中重用节点的示意图,可以看下 HashEntry e = oldTable[i]; if (e != null) { HashEntry next = e.next; int idx = e.hash & sizeMask; // Single node on list if (next == null) newTable[idx] = e; else { // Reuse trailing consecutive sequence at same slot HashEntry lastRun = e; int lastIdx = idx; // 这个循环是寻找HashEntry链最大的可重用的尾部 // 看过1.8的HashMap就知道,如果hash值是final的,那么每次扩容,扩容前在一条链表上的节点,扩容后只会有两个去向 // 这里重用部分中,所有节点的去向相同,它们可以不用被复制 for (HashEntry last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // 把重用部分整体放在扩容后的hash桶中 // 复制不能重用的部分,并把它们插入到rehash后的所在HashEntry链的头部 for (HashEntry p = e; p != lastRun; p = p.next) { int k = p.hash & sizeMask; HashEntry n = newTable[k]; newTable[k] = new HashEntry (p.key, p.hash, n, p.value); } // 这里也可以看出,重用部分rehash后相对顺序不变,并且还是在rehash后的链表的尾部 // 不能重用那些节点在rehash后,再一个个重头添加到链表的头部,如果还在一条链表上面,那么不能重用节点的相对顺序会颠倒 // 所以ConcurrentHashMap的迭代顺序会变化,本身它也不保证迭代顺序不变 } } } table = newTable; } // 因为1.6的HashEntry的next指针是final的,所以比普通的链表remove要复杂些,只有被删除节点的后面可以被重用,前面的都要再重新insert一次 V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; HashEntry [] tab = table; int index = hash & (tab.length - 1); HashEntry first = tab[index]; HashEntry e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay in list, but all preceding ones need to be cloned. // 因为next指针是final的,所以删除不能用简单的链表删除,需要把前面的节点都重新复制再插入一次,后面的节点可以重用 // 删除后,后面的可以重用的那部分顺序不变且还是放在最后,前面的被复制的那部分顺序颠倒地放在前面 // 后面Map.remove那里有个remove的示意图 ++modCount; HashEntry newFirst = e.next; for (HashEntry p = first; p != e; p = p.next) newFirst = new HashEntry (p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } } void clear() { if (count != 0) { lock(); try { HashEntry [] tab = table; for (int i = 0; i < tab.length ; i++) tab[i] = null; ++modCount; count = 0; // write-volatile } finally { unlock(); } } }}
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 寻找适合的2^n,以及n,默认构造时是16和4 int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; // 定位Segment的index时hash值的移位,为32 - n,默认构造时是28 segmentMask = ssize - 1; // 用于 & 运算定位Segment的index,默认构造时是0x0f this.segments = Segment.newArray(ssize); // 实际的concurrentLevel就是Segment数组的长度 if (initialCapacity > MAXIMUM_CAPACITY) // 虽然整个Map的capacity可以大于MAXIMUM_CAPACITY,但是初始化时为了满足2^n的要求,最大也只允许这么多 initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; // 上面这段,求每个Segment代表的实际的hash表的table数组的长度(专业点叫hash桶的数量),这个值是cap,默认构造时是1 // 参数initailCapacity是一个容量参考值,用来计算每个Segment在初始化时有多少个hash桶(这个桶的数量要符合2^n) // 整个ConcurrentHashMap的容量capacity是所有Segment的容量和,在初始化时每个Segment都一样, // 但是每个Segment都可以单独扩容,所以构造完成后这个capacity值就没啥用了 for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment< K,V > (cap, loadFactor);}public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap(Map m) { this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); putAll(m);}五、一些基础方法
// Wang/Jenkins hash的变种算法// 因为算出来的hash的最高的几位用于定位段,最低的几位用于段内定位hash桶,所以高位低位都需要扰动,要两个方向移位// HashMap中算出来的hash值只有最低的几位会被用到,所以只对需要对低位进行操作,一个方向移位就行 ,不需要下面这种更复杂的hash函数private static int hash(int h) { // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16);}// 定位到某个具体的Segment,使用hash值的高位,算法和HashMap的indexFor一样// ConcurrentHashMap中没有抽象出indexFor了,实际实现还是一样的,看Segment的代码final SegmentsegmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask];}
// ConcurrentHashMap.get是通过hash值定位到Segment,有这个Segment的get来完成的public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash);}// 同getpublic boolean containsKey(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).containsKey(key, hash);}// containsValue和size虽然是读操作,但是会批量读取到所有Segment,所以特殊处理// 先不加锁尝试两次以获得比较近似的结果,如果contains就直接返回(因为有一个存在就是存在,不存在才需要遍历全部),不contains才继续,如果发现modCount被其他线程修改,就全部加锁再执行// 不加锁读两次时,可能会碰见写操作的中间状态,也可能在循环到后面时有线程修改了前面,所以这个方法不是100%准确的// 设计成这样主要是为了提高效率,很多业务还是可以接受这种误差,需要更强一致性的时候,可以自己写个方法// 上面Segment的分析中指出了,put相同的key、replace方法不会修改modCount,但是会改变value,这一点使得后面检测modCount是否改变可能成为无用功,让containsValue方法的准确性降低了,1.7进行了修复public boolean containsValue(Object value) { if (value == null) throw new NullPointerException(); final Segment[] segments = this.segments; int[] mc = new int[segments.length]; // 先不加锁执行RETRIES_BEFORE_LOCK = 2次 for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { int sum = 0; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { int c = segments[i].count; // 这个c没哪里用,意义不明 mcsum += mc[i] = segments[i].modCount; // 就是 mc[i] = segments[i].count; mssum += mc[i],临时保存一份modCount if (segments[i].containsValue(value)) // 碰见contains直接return return true; } boolean cleanSweep = true; // mcsum是modCount的和,为0可以认为遍历开始时没有任何put完成过任何HashEntry,即方法开始执行时不包含任何HashEntry,可以认为(近似认为,几率比较大)此时也不包含 if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { int c = segments[i].count; if (mc[i] != segments[i].modCount) { // modCount改变,说明有其他线程修改了Segment的结构,退出循环。会有replace的问题,前面说了 cleanSweep = false; break; } } } if (cleanSweep) return false; } // 如果连续两次都碰见modCount改变的情况,这时候一次性对全部Segment加锁,最大程度保证遍历时的一致性 // 因为是全部加锁后再遍历,遍历开始后没有线程可以修改任何Segment的结构,可以保证当前线程得到的是准确值 for (int i = 0; i < segments.length; ++i) segments[i].lock(); boolean found = false; try { for (int i = 0; i < segments.length; ++i) { if (segments[i].containsValue(value)) { found = true; break; } } } finally { for (int i = 0; i < segments.length; ++i) segments[i].unlock(); } return found;}// 不是Map接口的方法,为了兼容Hashtable,等价于containsValuepublic boolean contains(Object value) { return containsValue( value);}// 基本同containValue,但是只执行一次且不会加锁public boolean isEmpty() { final Segment [] segments = this.segments; int[] mc = new int[segments.length]; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { if (segments[i].count != 0) return false; else mcsum += mc[i] = segments[i].modCount; } if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { if (segments[i].count != 0 || mc[i] != segments[i].modCount) return false; } } return true;}// 跟containsValue差不多,但是size不会受put相同的key、replace方法的影响// 注意最后一个int溢出处理,因为HashMap以及ConcurrentHashMap是个特殊的集合类,我们通常所说的容量是hash桶的数目,这并不是实际容量// 因为使用链表解决hash冲突的原因,实际的可以容纳得更多,可能会远远超多Integer.MAX_VALUE,这这时返回值就是个错误的值,但还是尽量返回了一个“比较有用”的值。// 这纯粹是历史原因造成的坑,返回个int,没考虑实际情况,1.8的新增了一个mappingCount方法,返回long型准确数字public int size() { final Segment [] segments = this.segments; long sum = 0; long check = 0; int[] mc = new int[segments.length]; // 不加锁执行两次,如果两次数据不一样,或者碰到modCount++被修改了,就全部加锁在执行一次 for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { check = 0; sum = 0; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { sum += segments[i].count; mcsum += mc[i] = segments[i].modCount; } if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { check += segments[i].count; if (mc[i] != segments[i].modCount) { check = -1; // force retry break; } } } if (check == sum) break; } if (check != sum) { sum = 0; for (int i = 0; i < segments.length; ++i) segments[i].lock(); for (int i = 0; i < segments.length; ++i) sum += segments[i].count; for (int i = 0; i < segments.length; ++i) segments[i].unlock(); } if (sum > Integer.MAX_VALUE) // int溢出处理,因此返回值可能会是错误的。 // 并且因为兼容性的原因,这个还无法解决,只能新增一个方法,1.8的ConcurrentHashMap就是新增了一个返回long型的方法 return Integer.MAX_VALUE; else return (int)sum;}
// ConcurrentHashMap.put是通过hash值定位到Segment,有这个Segment的put来完成的// ConcurrentHashMap的实现中不允许null key和null valuepublic V put(K key, V value) { if (value == null) throw new NullPointerException(); int hash = hash(key.hashCode()); // 这里 null key 会抛出NPE return segmentFor(hash).put(key, hash, value, false);}// 下面几个基本思路同put,都是代理给相应的Segment的对应方法进行操作public V putIfAbsent(K key, V value);public V remove(Object key);public boolean remove(Object key, Object value);public boolean replace(K key, V oldValue, V newValue);public V replace(K key, V value);// putAll和clear都是循环操作,没有全局加锁,在执行期间还是可以执行完成其他的写操作的,事务性比较差的方法,设计成不用全局锁是为了提高效率public void putAll(Map m) { for (Map.Entry e : m.entrySet()) put(e.getKey(), e.getValue());}public void clear() { for (int i = 0; i < segments.length; ++i) segments[i].clear();}put的简单个示意图如下(圆形中的数字是乱写的,非真实数据)。
final class WriteThroughEntry extends AbstractMap.SimpleEntry{ WriteThroughEntry(K k, V v) { super(k,v); } public V setValue(V value) { if (value == null) throw new NullPointerException(); V v = super.setValue(value); ConcurrentHashMap.this.put(getKey(), value); return v; }}