HashMap 与 ConcurrentHashMap 的实现原理

2021.01.07 06:01 34
阅读约 5 分钟

🥤HashMap 与CurrentHashMap 

在讨论ConcurrentHashMap之前,我们有必要先了解一下HashMap这个东东。哈希表(hash table)也叫散列表,是一种非常重要的数据结构,在java开发过程中,小🔥伴免不了要与HashMap接触,但是可能仅限于接触,对底层的知识欠缺一个系统的认知,所以让我们一起来复习一下吧。

① 🙋什么是哈希表

让我们大致看一下其他类型数据结构在新增,查找等基础操作执行性能。

类型性能
数组采用一段连续的存储单元来存储数据。对于指定下标的查找,时间复杂度为O(1);通过给定值进行查找,需要遍历数组,逐一比对给定关键字和数组元素,时间复杂度为O(n),当然,对于有序数组,则可采用二分查找,插值查找,斐波那契查找等方式,可将查找复杂度提高为O(logn);对于一般的插入删除操作,涉及到数组元素的移动,其平均复杂度也为O(n)
线性链表对于链表的新增,删除等操作(在找到指定操作位置后),仅需处理结点间的引用即可,时间复杂度为O(1),而查找操作需要遍历链表逐一进行比对,复杂度为O(n)
二叉树对一棵相对平衡的有序二叉树,对其进行插入,查找,删除等操作,平均复杂度均为O(logn)
哈希表相比上述几种数据结构,在哈希表中进行添加,删除,查找等操作,性能十分之高,不考虑哈希冲突的情况下,仅需一次定位即可完成,时间复杂度为O(1)

我们知道,数据结构的物理存储结构只有两种:顺序存储结构链式存储结构像栈,队列,树,图等是从逻辑结构去抽象的,映射到内存中,也这两种物理组织形式),而在上面我们提到过,在数组中根据下标查找某个元素,一次定位就可以达到,哈希表利用了这种特性,哈希表的主干就是数组。

1.1 哈希函数

比如我们要新增或查找某个元素,我们通过把当前元素的关键字 通过某个函数映射到数组中的某个位置,通过数组下标一次定位就可完成操作。储存位置=关键字

元素插入操作

这个函数ƒ一般称为哈希函数,举个栗子👉:比如我们想要在哈希表中执行插入操作,先通过哈希函数计算得到储存地址,然后执行插入。

查找操作同理。

但是事情没有那么简单,在插入过程中我们可能会遇到一个问题,如果两个元素计算得到的哈希值一样的时候怎么办?这就是典型的哈希冲突问题。

1.2 哈希冲突

这里需要提一下哈希函数的设计,设计良好的哈希函数会尽量保证计算简单散列地址的均匀分布,但是我们需要知道,数组是一块连续的固定长度内存空间,再好的哈希函数也不能保证得到的存储地址绝对不发生冲突。

那么哈希冲突如何解决呢?哈希冲突的解决方案有多种:开放定址法(发生冲突,继续寻找下一块未被占用的存储地址),再散列函数法,链地址法,而HashMap即是采用了链地址法,也就是数组+链表的方式。

关于哈希冲突解决方法的详解可以查看:哈希冲突解决方法

② 🧜‍♂️HashMap实现原理(java 8)

了解了HashMap的基础知识之后,我们来看看java是如何设计HashMap的吧。HashMap的主干是一个Node数组。Node是HashMap的基本组成单元,每一个Node包含一个key-value键值对。

transient Node<K,V>[] table;

Node是一个静态内部类

 static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        V value;
        Node<K,V> next;

        Node(int hash, K key, V value, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }
 }

所以,HashMap的整体结构如👇

简单来说,HashMap由数组+链表组成的,数组是HashMap的主体,链表则是主要为了解决哈希冲突而存在的,如果定位到的数组位置不含链表(当前entry的next指向null),那么对于查找,添加等操作很快,仅需一次寻址即可;如果定位到的数组包含链表,对于添加操作,其时间复杂度依然为O(1),因为最新的Entry会插入链表头部,仅需要简单改变引用链即可,而对于查找操作来讲,此时就需要遍历链表,然后通过key对象的equals方法逐一比对查找。所以,性能考虑,HashMap中的链表出现越少,性能才会越好。

链表转红黑树

在执行插入操作时,若链表的长度 binCount >=7,那么将会转为红黑树。

  if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
        treeifyBin(tab, hash);

其他几个重要字段

//实际存储的key-value键值对的个数
transient int size;
//阈值,当table == {}时,该值为初始容量(初始容量默认为16);当table被填充了,也就是为table分配内存空间后,threshold一般为 capacity*loadFactory。HashMap在进行扩容时需要参考threshold,后面会详细谈到
int threshold;
//负载因子,代表了table的填充度有多少,默认是0.75
final float loadFactor;
//用于快速失败,由于HashMap非线程安全,在对HashMap进行迭代时,如果期间其他线程的参与导致HashMap的结构发生变化了(比如put,remove等操作),需要抛出异常ConcurrentModificationException
transient int modCount;

initialCapacity默认为16,loadFactory默认为0.75

hash函数

static final int hash(Object key) {
        int h;
        return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
 }

哈希函数计算出来的值,通过下面的代码进行寻位插入,其中i为下标

   if ((p = tab[i = (n - 1) & hash]) == null)
            tab[i] = newNode(hash, key, value, null);

(n - 1) & hash保证获取的index一定在数组范围内,举个例子,默认容量16,length-1=15,h=18,转换成二进制计算为

     1  0  0  1  0
    &   0  1  1  1  1
    __________________
        0  0  0  1  0    = 2

  最终计算出的index=2。有些版本的对于此处的计算会使用 取模运算,也能保证index一定在数组范围内,不过位运算对计算机来说,性能更高一些(HashMap中有大量位运算)

④ 😺重写equals方法需同时重写hashCode方法

HashMap源码分析到此就告一段落了,最后我们再来盘一个面试题,为什么<<重写equals方法需同时重写hashCode方法>>,举个例子

/**
 * Created by chengxiao on 2016/11/15.
 */
public class MyTest {
    private static class Person{
        int idCard;
        String name;
 
        public Person(int idCard, String name) {
            this.idCard = idCard;
            this.name = name;
        }
        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()){
                return false;
            }
            Person person = (Person) o;
            //两个对象是否等值,通过idCard来确定
            return this.idCard == person.idCard;
        }
 
    }
    public static void main(String []args){
        HashMap<Person,String> map = new HashMap<Person, String>();
        Person person = new Person(1234,"乔峰");
        //put到hashmap中去
        map.put(person,"天龙八部");
        //get取出,从逻辑上讲应该能输出“天龙八部”
        System.out.println("结果:"+map.get(new Person(1234,"萧峰")));
    }
}

实际输出结果:

结果:null

如果我们已经对HashMap的原理有了一定了解,这个结果就不难理解了。尽管我们在进行get和put操作的时候,使用的key从逻辑上讲是等值的(通过equals比较是相等的),但由于没有重写hashCode方法,所以put操作时,key(hashcode1)-->hash-->indexFor-->最终索引位置 ,而通过key取出value的时候 key(hashcode2)-->hash-->indexFor-->最终索引位置,由于hashcode1不等于hashcode2,导致没有定位到一个数组位置而返回逻辑上错误的值null(也有可能碰巧定位到一个数组位置,但是也会判断其entry的hash值是否相等,上面get方法中有提到。)

所以,在重写equals的方法的时候,必须注意重写hashCode方法,同时还要保证通过equals判断相等的两个对象,调用hashCode方法要相等。而如果equals判断不相等的两个对象,其hashCode可以相同(只不过会发生哈希冲突,应尽量避免)。

注意:

  1. 如果两个对象相同,就是适用于equals(java.lang.Object) 方法,那么这两个对象的hashCode一定要相同
  2. 两个对象的hashCode相同,并不一定表示两个对象就相同,也就是不一定适用于equals(java.lang.Object) 方法,只能够说明这两个对象在散列存储结构中,如Hashtable,他们“存放在同一个篮子里”。

⑤ 😲什么时候需要重写?

一般的地方不需要重载hashCode,只有当类需要放在HashTable、HashMap、HashSet等等hash结构的集合时才会重载hashCode,那么为什么要重载hashCode呢?就HashMap来说,好比HashMap就是一个大内存块,里面有很多小内存块,小内存块里面是一系列的对象,可以利用hashCode来查找小内存块hashCode%size(小内存块数量),所以当equal相等时,hashCode必须相等,而且如果是object对象,必须重载hashCode和equal方法。

🎸ConcurrentHashMap实现原理

ConcurrentHashMap是Java并发包中提供的一个线程安全且高效的HashMap实现,ConcurrentHashMap在并发编程的场景中使用频率非常之高,本文就来分析下ConcurrentHashMap的实现原理,并对其实现原理进行分析(JDK1.8).

思考下面的问题:

  • Q1:为什么多线程环境下不能使用HashMap?
    A:多线程环境下HashMap可能会导致循环链表,造成CPU100%
  • Q2: JDK 1.8 HashMap是否是线程安全的?
    A:JDK 1.8虽然解决了链表闭环的问题,但是由于hashmap在设计的时候并未考虑线程安全的问题,所以在多线程环境中使用,在扩容时仍然可能造成数据丢失,并不是线程安全的。
  • Q3:ConcurrentHaspMap的数组长度为什么只能是2的N次方?
    A:为了避免hash冲突,尽可能的散列数据。(下文会详细讲解)
  • Q4:ConcurrentHaspMap是如何保证线程安全的?
    A:通过CAS+synchronized来保证。(下文会详细讲解)
  • Q5:ConcurrentHaspMap中数据存储的可能形式有哪些?
    A:数组、链表、红黑树
  • Q6:ConcurrentHaspMap的扩容机制是什么?
    A:多线程通过CAS+synchronized并发扩容。(下文会详细讲解)。

① 🥽两种结构的比较

众所周知,哈希表是中非常高效,复杂度为O(1)的数据结构,在Java开发中,我们最常见到最频繁使用的就是HashMap和HashTable,但是在线程竞争激烈的并发场景中使用都不够合理。

类型特性
HashMap先说HashMap,HashMap是线程不安全的,在并发环境下,可能会形成环状链表(扩容时可能造成,具体原因自行百度google或查看源码分析),导致get操作时,cpu空转,所以,在并发环境中使用HashMap是非常危险的。
HashTableHashTable和HashMap的实现原理几乎一样,差别无非是1.HashTable不允许key和value为null;2.HashTable是线程安全的。但是HashTable线程安全的策略实现代价却太大了,简单粗暴,get/put所有相关操作都是synchronized的,这相当于给整个哈希表加了一把大锁,多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作串行化,在竞争激烈的并发场景中性能就会非常差。

HashTable性能差主要是由于所有操作需要竞争同一把锁。

② 基本原理

  • 内部持有一个Node<K,V>数组,用来存放key,value
    • 这个数组的默认长度是16,并且只会在第一个put的时候才会初始化
  • put的时候要通过运算得到相应存放的数组下标,然后根据不同的情况决定初始化数组、插入链表、插入红黑树或者协助扩容。
    • 先进行hash扰动
    • 数组如果还未进行初始化,则先初始化。如果指定了初始化大小,则会计算一个>=指定值,且为2的N次幂的数字,且最接近当前参数的数字作为初始长度。
    • 当前位置==null,则直接通过cas插入
    • 如果当前数组正在扩容,则协助扩容
    • 当前位置≠null。如果当前节点是红黑树,则直接插入树中。否则作为链表插入。
    • 插入成功后,如果是链表,则检查是否需要转成红黑树。转换条件是链表节点数>=8,且数组长度>64。
    • 最后更新size的值,检查是被否需要扩容。
  • get的时候同样通过计算数组下表,然后遍历
    • 先进性hash扰动,使用hash&(n-1)得到数组索引。
    • 取索引对应的数据进行遍历。可能是链表、红黑树,也可能是FWD节点。
  • size++的时候,先尝试用更新volatile baseCount,更新失败再尝试定位到具体的CounteCell,用CAS或者直接更新其volatile value

③ 类构成

属性

数组最大长度,2的30次方。之所以不是2的32次方,是因为前两位用于控制
private static final int MAXIMUM_CAPACITY = 1 << 30;
默认数组大小16
private static final int DEFAULT_CAPACITY = 16;
链表转红黑树的阈值
static final int TREEIFY_THRESHOLD = 8;
红黑树转链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
链表转红黑树时需满足的最小数组长度
static final int MIN_TREEIFY_CAPACITY = 64;
表示正在转移
static final int MOVED     = -1; // hash for forwarding nodes
红黑树的hash值
static final int TREEBIN   = -2; // hash for roots of trees
static final int RESERVED  = -3; // hash for transient reservations
//hash扰乱时用来将数值转换为正数
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
CPU可用核心个数
static final int NCPU = Runtime.getRuntime().availableProcessors();


存放数据的数组
transient volatile Node<K,V>[] table;
要转移到的目标数组,只有扩容的时候非空
private transient volatile Node<K,V>[] nextTable;
计数器conut,用来统计size
private transient volatile long baseCount;
-1/初始化 0/默认值  >0 未初始化之前的长度/初始化之后的阈值  <-1 特殊数字+n记录正在扩容的线程数
private transient volatile int sizeCtl;
近似于多线程扩容时当前线程可以取到的最大下标
private transient volatile int transferIndex;
用来计数的数组
private transient volatile CounterCell[] counterCells;

Node-链表类,用于储存key-value

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        指向下个元素的指针
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

TreeNode-用来构建TreeBin,红黑树的节点

static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;

        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }

TreeBin-存放红黑树的首节点和根节点

 static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;
        // values for lockState
        static final int WRITER = 1; // set while holding write lock
        static final int WAITER = 2; // set when waiting for write lock
        static final int READER = 4; /

ForwardingNode-占位节点,插入链表头部,表示正在被转移

static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }

④ 🎯如何保证线程安全的实现

4.1 取值

public V get(Object key) {
  Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  int h = spread(key.hashCode());
  if ((tab = table) != null && (n = tab.length) > 0 &&
      (e = tabAt(tab, (n - 1) & h)) != null) {
    if ((eh = e.hash) == h) {
      if ((ek = e.key) == key || (ek != null && key.equals(ek)))
        return e.val;
    }
    else if (eh < 0)
      return (p = e.find(h, key)) != null ? p.val : null;
    while ((e = e.next) != null) {
      if (e.hash == h &&
          ((ek = e.key) == key || (ek != null && key.equals(ek))))
        return e.val;
    }
  }
  return null;
} 

首先根据key计算出哈希值,如果找到了就直接返回值。如果是红黑树的话,就在红黑树中查找值,否则就按照链表的查找方式查找。

这与HashMap也差不多的,元素会首先以链表的方式进行存储,如果该桶中的元素数量大于TREEIFY_THRESHOLD的值,就会触发树化。将当前的链表转换为红黑树。因为如果数量太多的话,链表的查询效率就会变得非常低,时间复杂度为O(n),而红黑树的查询时间复杂度则为O(logn),这个阈值在Java 1.8中的默认值为8,定义如下。

static final int TREEIFY_THRESHOLD = 8;

4.2 put

final V putVal(K key, V value, boolean onlyIfAbsent) {
  if (key == null || value == null) throw new NullPointerException();
  //对key的hashCode进行散列
  int hash = spread(key.hashCode());
  int binCount = 0;
  //一个无限循环,直到put操作完成后退出循环
  for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
    //当Node数组为空时进行初始化
    if (tab == null || (n = tab.length) == 0)
      tab = initTable();
    //Unsafe类volatile的方式取出hashCode散列后通过与运算得出的Node数组下标值对应的Node对象
    //此时的Node对象若为空,则代表还未有线程对此Node进行插入操作
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
      //直接CAS方式插入数据
      if (casTabAt(tab, i, null,
                   new Node<K,V>(hash, key, value, null)))
        //插入成功,退出循环
        break;                   // no lock when adding to empty bin
    }
    //查看是否在扩容,先不看,扩容再介绍
    else if ((fh = f.hash) == MOVED)
      //帮助扩容
      tab = helpTransfer(tab, f);
    else {
      V oldVal = null;
      //对Node对象进行加锁
      synchronized (f) {
        //二次确认此Node对象还是原来的那一个
        if (tabAt(tab, i) == f) {
          if (fh >= 0) {
            binCount = 1;
            //无限循环,直到完成put
            for (Node<K,V> e = f;; ++binCount) {
              K ek;
              //和HashMap一样,先比较hash,再比较equals
              if (e.hash == hash &&
                  ((ek = e.key) == key ||
                   (ek != null && key.equals(ek)))) {
                oldVal = e.val;
                if (!onlyIfAbsent)
                  e.val = value;
                break;
              }
              Node<K,V> pred = e;
              if ((e = e.next) == null) {
                //和链表头Node节点不冲突,就将其初始化为新Node作为上一个Node节点的next
                //形成链表结构
                pred.next = new Node<K,V>(hash, key,
                                          value, null);
                break;
              }
            }
          }
          ...
}
              }
              Node<K,V> pred = e;
              if ((e = e.next) == null) {
                //和链表头Node节点不冲突,就将其初始化为新Node作为上一个Node节点的next
                //形成链表结构
                pred.next = new Node<K,V>(hash, key,
                                          value, null);
                break;
              }
            }
          }
          ...
}
 static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
 }
  • hash扰动
  • 死循环put,直到成功
    • 数组未初始化,则进行初始化
    • 元素为空,则进行CAS插入
    • 元素正在转移,则协助转移
    • 存在hash冲突,则锁住头节点,进行插入
  • size+1&检查扩容

4.3 初始化

构造方法

 
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        //传进来的参数大于最大容量的一倍,则直接使用最大容量。因为默认都是2倍扩容,当大于等于最大值的一半时,再进行2倍扩容反而得不偿失,不如直接使用最大容量。
        //小于最大容量的一半,则会计算出一个大于等于1.5*initialCapacity+1,且为2的N次幂的数字,作为初始数组长度。
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        //在构造函数被调用时,sizeCtl存放的是上面计算得到的初始数组长度
        this.sizeCtl = cap;
    }

构造函数仅仅计算了一个需要初始化的数组长度,并未进行实际的数组的初始化。

初始化数组

在JDK1.8中,初始化ConcurrentHashMap的时候这个Node[]数组是还未初始化的,会等到第一次put方法调用时才初始化:

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //判断Node数组为空
            if (tab == null || (n = tab.length) == 0)
                //初始化Node数组
                tab = initTable();
          ...
}

此时是会有并发问题的,如果多个线程同时调用initTable初始化Node数组怎么办?cas+volatile+自旋

private final Node<K,V>[] initTable() {
  Node<K,V>[] tab; int sc;
  //每次循环都获取最新的Node数组引用
  while ((tab = table) == null || tab.length == 0) {
    //sizeCtl是一个标记位,若为-1也就是小于0,代表有线程在进行初始化工作了
    if ((sc = sizeCtl) < 0)
      //让出CPU时间片
      Thread.yield(); // lost initialization race; just spin
    //CAS操作,将本实例的sizeCtl变量设置为-1
    else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
      //如果CAS操作成功了,代表本线程将负责初始化工作
      try {
        //再检查一遍数组是否为空
        if ((tab = table) == null || tab.length == 0) {
          //在初始化Map时,sizeCtl代表数组大小,默认16
          //所以此时n默认为16
          int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
          @SuppressWarnings("unchecked")
          //Node数组
          Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
          //将其赋值给table变量
          table = tab = nt;
          //通过位运算,n减去n二进制右移2位,相当于乘以0.75
          //例如16经过运算为12,与乘0.75一样,只不过位运算更快
          sc = n - (n >>> 2);
        }
      } finally {
        //将计算后的sc(12)直接赋值给sizeCtl,表示达到12长度就扩容
        //由于这里只会有一个线程在执行,直接赋值即可,没有线程安全问题
        //只需要保证可见性
        sizeCtl = sc;
      }
      break;
    }
  }
  return tab;
}

tableSizeFor

  private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

这个方法是用来计算数组的长度。我们都知道数组的长度一定是2的N次幂,那么这个方法的返回值一定是2的N次幂,且最接近给定的参数c。那么是如何实现的呢

 假设c=11
-----------------------------
int n=11-1=10;
-----------n |= n >>> 1------------------
n:            00000001010(省略高位0,下同)
n>>>1          00000000101
n|n>>>1        00000001111
n:             00000001111
------------------------------------------
-----------n |= n >>> 2------------------
n>>>2          00000000011
n|n>>>2        00000001111
n:             00000001111
------------------------------------------
-----------n |= n >>> 4------------------
n>>>4          00000000000
n|n>>>4        00000001111
n:             00000001111
------------------------------------------
-----------n |= n >>> 8------------------
n>>>8          00000000000
n|n>>>8        00000001111
n:             00000001111
------------------------------------------
-----------n |= n >>> 16------------------
n>>>16          00000000000
n|n>>>16        00000001111
n:             00000001111
------------------------------------------
n=00000001111 即十进制 15
return n+1=16

可以看到,核心原理是将给定数字的从第一个非0位置开始,全部转换为1,放在上面的例子里就是将二进制1010转换为1111,最后再加上1,刚好是我们想要的2的N次幂。至于为什么要是2的N次幂,我们放在下面会说。

4.4 CAS插入

元素为空,则插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
transient volatile Node<K,V>[] table;
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

4.5 为什么这里的n必须是2的N次幂,可以为其它数字吗,比如15,18?

这里的n必须是2的N次幂,不可以为其它数字,即使调用构造函数时你传入了15,最终也只会计算得到一个2的N次幂,来作为初始化数组的长度。这么做的目的是使计算得到的下标i,尽可能分散,也就是尽可能减少hash冲突

假设n=15                                                                                   假设n=16
则n-1=14                                                                                   则n-1=15
假设hashcode=xxxx(省略高位0,下同)                                                        假设hashcode=xxxx
---------------------------------                                                        ---------------------------------
1110                                                                                     1111  
& hash                                                                                   & hash
xxxx                                                                                     xxxx
  
---------------------------------                                                       ---------------------------------
已看到,从左至右
第一位可能的值有两种 0/1                                                                 第一位可能的值有两种 0/1
第二位可能的值也有两种 0/1                                                               第二位可能的值也有两种 0/1
第三位可能的值也有两种 0/1                                                               第三位可能的值也有两种 0/1
第四位的值则只能是 0,无论x是0还是1                                                       第四位可能的值也有两种 0/1  
可能的组合种类 2*2*2=8种                                                                 可能的组合种类 2*2*2*2=16种 

低位全部是1因此hash时更加分散。

4.6 协助扩容

如果put的时候,发现数组正在扩容,则协助扩容,毕竟扩容是一个耗时的操作,多线程扩容可以大大加快扩容的速度。那么是如何实现的呢?

  final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
       数组不为空&当前节点为FWD节点&转移的目标数组不为空
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            根据数组长度计算一个唯一的数据戳(一个很大的负数)
            int rs = resizeStamp(tab.length);
           检查nextTab和table都未发生更改,且sizeCtl<0
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    扩容结束
                    break;
                将sc设置未sc+1
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    扩容
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }

这里有几个退出扩容的条件值得注意

  • (sc >>> RESIZE_STAMP_SHIFT) != rs
  • sc == rs + 1,没有线程进行扩容了即扩容结束了
  • sc == rs + MAX_RESIZERS,达到最大帮助线程数65535

4.7 锁住头节点,插入数据

如果节点不为空,且hash值!=-1,则证明存在hash冲突,则需要使用synchronized进行加锁。从这里可知,如果多个线程同时put,可能会阻塞(计算得到的索引相同)

 V oldVal = null;
                synchronized (f) {
                    再次检查
                    if (tabAt(tab, i) == f) {
                        链表
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        红黑树
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                如果是链表,则检查是否需要转树
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }

代码逻辑比较清晰,链表的话则是替换或者插入链表尾部,红黑树的话,则是构造成TreeBin或者TreeNode

字数:1359 发布于 2 个月前
Copyright 2018-2021 Siques