并发编程总结(二):JUC下的锁

1.JUC的包结构

我们查看下java.util.concurrent.locks包下面,发现主要包含如下类:

image-20200301111726969

构建UML图如下:

image-20200301113556313

JUC包下主要有三把锁:

  1. ReentrantLock
  2. StampedLock,其相较于ReentrantReadWriteLock,多了个乐观读,其实就是用于校验读取的数据是否被修改了
  3. ReentrantReadWriteLock

我们可以看到上述三个锁除了StampedLock,顶层都是AbstractQueueSynchronizer类,因此我们来介绍下该类

2.AbstractQueueSynchronizer

AbstractQueuedSynchronizer,简写为AQS,抽象队列同步器,许多同步器都可以通过AQS很容易并且高效的构造出来,以下都是通过ASQ构造出来的:ReentrantLockSemaphoreCountDownLatchReentrantReadWriteLockSynchronousQueueFutureTask

2.1.AQS原理

AQS的作用:通过队列来辅助实现线程同步。线程并发争夺state资源,争夺失败的则进入等待队列(同步队列)并进入阻塞状态,在state资源被释放之后,从队列头唤醒被阻塞的线程节点,进行state资源的竞争

AQS的抽象:AQS将最难写的频繁出队入队操作、线程的阻塞唤醒操作已模板的方式写好了,实现类只需要将模板的相关方法进行实现,即可实现不一样的锁或者同步器

AQS使用了模板方法,把同步队列都封装起来了,同时提供了以下五个未实现的方法,用于子类的重写:

方法签名方法描述
boolean tryAcquire(int arg)尝试以独占模式进行获取。 此方法应查询对象的状态是否允许以独占模式获取对象,如果允许则获取它。如果获取失败,则将当前线程加入到等待队列,直到其他线程唤醒。
boolean tryRelease(int arg)尝试以独占模式释放锁。
int tryAcquireShared(int arg)尝试以共享模式获取锁,此方法应查询对象的状态是否允许以共享模式获取对象,如果允许则获取它。如果获取失败,则将当期线程加入到等待队列,直到其他线程唤醒。
boolean tryReleaseShared(int arg)尝试以共享模式释放锁。
boolean isHeldExclusively()是否独占模式

2.2.AQS的数据结构

image-20200307215156707

AQS中包含的变量

  • state:所有线程通过CAS尝试给state设值,当state>0时表示线程被占用,同一个线程多次获取state,会叠加state的值,从而实现了可重入
  • exclusiveOnwerThread:在独占模式下这个属性会用到,当线程尝试以独占模式成功给state设值之后,该变量会存储独占的线程。
  • 等待队列(同步队列):等待队列中存放了所有针对state失败的线程,是一个双向链表结构。state被某一个线程占用之后,其余线程会进入等待队列等待,等待state被释放(state=0),释放state的线程会唤醒等待队列中的线程继续尝试cas设值state
  • head:指向等待队列的头节点,延迟初始化,除了初始化之外,只能通过setHead方法进行修改
  • tail:指向等待队列的队尾,延迟初始化,只能通过enq方法修改tail,该方法主要是往队列后面添加等待节点

AQS队列节点Node类的数据结构

  • pre:指向队列的上一个节点
  • waitStatus:节点的等待状态,初始化为0,表示正常同步等待
    • CANCELLED:1 节点因超时或者被中断而取消时设置为取消状态
    • SIGNAL:-1 指示当前节点被释放后,需要调用unpark通知后面节点,如果后面节点发生竞争导致获取锁失败,也会将当前节点设置为SIGNAL
    • CONDITION:-2 指示该线程正在进行条件等待,条件队列中会用到
    • PROPAGATE:-3 共享模式下释放节点时设置的状态,表示无限传播下去
  • thread:当前节点操作的线程
  • nextWaiter:该字段在Condition条件等待中会用到,指向条件队列的下一个节点。或者链接到SHARED常量,表示节点正在以共享模式等待
  • next:指向队列的下一个节点

2.3.LockSupport.park(Object blocker)和LockSupport.unpark(Thread thread)

如果想要了解AQS的实现,您需要先知道以下这些内容,因为源码中会大量使用:

AQS中线程的阻塞和唤醒基本上都是使用这两个方法实现的,其底层都是依赖的Unsafe实现的

LockSupport是用来创建锁和其他同步类的基本线程阻塞的原语:

此类与每一个线程都关联一个许可(permit:0 表示无许可,1 表示有许可),如果有许可,将立即返回对park()的调用,如果没有则阻塞当前调用线程,调用unpark(线程1)可使许可有效,此时被阻塞的线程得以执行,unpark可以先于park执行也没关系,此时unpark的入参线程调用park方法时,会直接执行

该类中常见的两个方法两个方法:

  • park(Object blocker):实现线程的阻塞。除非有许可,否则出于线程调度目的将阻塞线程;如果有许可,则将许可消耗,然后线程往下继续执行;
  • unpark(Thread thread):实现解除线程的阻塞。如果线程在park方法上被阻塞,则调用该方法将取消阻塞。否则,许可变为1,保证下一次调用park方法不会阻塞。

这两个方法底层是调用了Unsafe中的park和unpark的native方法。

2.3.AQS中的一般处理流程

为了弄清楚AQS中是如何进行队列同步的,我们先从一个简单的独占加锁方法说起

2.3.1.public final void acquire(int arg)

以独占模式获取锁,忽略中断。 通过至少调用一次tryAcquire ,成功返回。 否则线程会排队,可能会反复阻塞和解除阻塞,调用tryAcquire直到成功。 此方法可用于实现方法Lock.lock 。

我们先看一下这个方法的入口代码:

public final void acquire(int arg) {
  if (!tryAcquire(arg) &&  // 尝试获取锁,这里是一个在AQS中未实现的方法,具体由子类实现
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  // 获取不到锁,则 1.addWaiter添加到等待队列 2.acquireQueued不断循环等待重试
    selfInterrupt();//如果没有获取到锁并且进入了等待队列,则end
}

tryAcquire(arg)

  1. 线程1、线程2通过方法tryAcquire()尝试获取锁同时去获取锁
  2. 获取成功的线程会返回true,一般都是通过CAS对state进行设置值尝试获取锁

image-20200311103257880

不同的锁有不同的tryAcquire的实现,所以你可以看到ReentrantLock锁里面会有非公平锁和公平锁的实现方式

ReentrantLock公平锁的实现代码是在获取锁之前通过!hasQueuedPredecessors(),判断当前线程处于等待队列的节点前面没有别的节点了,此时才会去获取锁

addWaiter(Node.EXCLUSIVE)

获取锁失败之后,则在等待队列之后追加一个节点,通过CAS进行追加,追加失败会循环重试,如果追加的时候发现head节点还不存在,可以先初始化一个head节点,然后追加上去

image-20200311103646009

源码如下:

/**
 * 为当前线程和给定模式创建和排队节点
 */
private Node addWaiter(Node mode) {
    //初始化一个新节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    //拿到尾节点
    Node pred = tail;
    if (pred != null) {
        //将新的节点指向尾节点
        node.prev = pred;
        // 尝试用新节点取代原来的尾节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
   // 如果当前尾指针为空,则调用enq方法去初始化tail及head
    enq(node);
    return node;
}

acquireQueued(final Node node, int arg)

  1. 线程节点进入等待队列后,执行该方法,不断循环判断当前节点是否在head节点的后继节点,如果是则去尝试tryAcquire()获取锁,如果获取成功,则将当前节点作为head节点,并将next设置为null,head节点只是起到标识作用,每次处理的都是head的下一个节点
  2. 如果当前节点(线程A)竞争锁失败或不是头结点的下一个节点,则会将前面的节点状态设置为SIGNAL(代表该节点执行完成后需要调用LockSupport的unpark(线程A)方法),如果前面的节点状态>0,表示这个节点被取消,移除队列,然后通过parkAndCheckInterrupt()调用LockSupport.park(this)挂起线程
  3. 上述步骤1、2会for循环一直执行,类似于wait的范式,当线程从park处唤醒后,会再去执行上述步骤1、2

image-20200311103510923

final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      // 获取该节点的上一个节点,判断是否头节点,如果是则尝试获取锁
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        // 获取锁成功,把当前节点变为头节点
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      // 判断是否需要阻塞线程,该方法中会把取消状态的节点移除掉,并且把当前节点的前一个节点设置为SIGNAL
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

大家看AQS的源码的时候,可以发现这里的线程阻塞与唤醒基本上是用一个循环+LockSupport.park+LockSupport.unpark实现的

2.3.2.public final boolean release(int arg)

源码如下:

public final boolean release(int arg) {
  if (tryRelease(arg)) { // 尝试释放锁
    Node h = head;
    if (h != null && h.waitStatus != 0)  // 如果头节点waitStatus不为0,则唤醒后续线程节点继续处理
      unparkSuccessor(h);
    return true;
  }
  return false;
}

tryRelease()具体实现由其子类实现,一般是让state值减一

  1. 释放锁成功,并且头结点waitStatus != 0那么会调用unparkSuccessor()通知唤醒后续的线程节点进行处理

    注意:在遍历队列查找唤醒下一个节点的过程中,如果发现下一个节点状态是CANCELLED那么就会忽略这个节点,然后从队列尾部向前遍历,找到与头结点最近的没有被取消的节点进行唤醒操作

  2. 线程A被唤醒之后,其又会从acquireQueued()方法被阻塞出继续执行,其流程和上述acquireQueued()方法一致

image-20200311103813051

image-20200311104443722

3.使用AQS实现的锁

例如ReentrantLock

  • lock(): 调用该方法会使锁计数器加1,如果共享资源最初是空闲的,则将锁定并授予线程;
  • unlock(): 调用该方法使锁计数器减1,当计数达到0的时候,将释放资源;
  • tryLock(): 如果资源没有被任何其他线程占用,那么该方法返回true,并且锁计数器加1。如果资源不是空闲的,则该方法返回false。这个时候线程不会阻塞,而是直接退出返回结果;
  • lockInterruptible(): 该方法使得资源空闲时允许该线程在获取资源时被其他线程中断。也就是说:如果当前线程正在等待锁,但其他线程请求该锁,则当前线程将被中断并立即返回,不会继续等待获取锁;
  • getHoldCount(): 获取资源上持有的锁的计数器;
  • isHeldByCurrentThread: 如果资源锁有当前线程持有,则此方法返回true。

参考链接:

https://www.itzhai.com/articles/aqs-and-lock-implementation-in-concurrent-packages.html#fn1

https://baijiahao.baidu.com/s?id=1666548481761194849&wfr=spider&for=pc

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×