博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
并发编程总结4-JUC-REENTRANTLOCK-2(公平锁)
阅读量:5951 次
发布时间:2019-06-19

本文共 11407 字,大约阅读时间需要 38 分钟。

 内容包括:
  1、ReentrantLock函数分析
  2、ReentrantLock公平锁源码
-------------------------------------------
  
ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。
  ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
  ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否,ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。
一、ReentrantLock函数列表
1 // 创建一个 ReentrantLock ,默认是“非公平锁”。 2 ReentrantLock() 3 // 创建策略是fair的 ReentrantLock。fair为true表示是公平锁,fair为false表示是非公平锁。 4 ReentrantLock(boolean fair) 5 // 查询当前线程保持此锁的次数。 6 int getHoldCount() 7 // 返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回 null。 8 protected Thread getOwner() 9 // 返回一个 collection,它包含可能正等待获取此锁的线程。10 protected Collection
getQueuedThreads()11 // 返回正等待获取此锁的线程估计数。12 int getQueueLength()13 // 返回一个 collection,它包含可能正在等待与此锁相关给定条件的那些线程。14 protected Collection
getWaitingThreads(Condition condition)15 // 返回等待与此锁相关的给定条件的线程估计数。16 int getWaitQueueLength(Condition condition)17 // 查询给定线程是否正在等待获取此锁。18 boolean hasQueuedThread(Thread thread)19 // 查询是否有些线程正在等待获取此锁。20 boolean hasQueuedThreads()21 // 查询是否有些线程正在等待与此锁有关的给定条件。22 boolean hasWaiters(Condition condition)23 // 如果是“公平锁”返回true,否则返回false。24 boolean isFair()25 // 查询当前线程是否保持此锁。26 boolean isHeldByCurrentThread()27 // 查询此锁是否由任意线程保持。28 boolean isLocked()29 // 获取锁。30 void lock()31 // 如果当前线程未被中断,则获取锁。32 void lockInterruptibly()33 // 返回用来与此 Lock 实例一起使用的 Condition 实例。34 Condition newCondition()35 // 仅在调用时锁未被另一个线程保持的情况下,才获取该锁。36 boolean tryLock()37 // 如果锁在给定等待时间内没有被另一个线程保持,且当前线程未被中断,则获取该锁。38 boolean tryLock(long timeout, TimeUnit unit)39 // 试图释放此锁。40 void unlock()
ReentrantLock函数列表

 二、基本概念

 1. AQS -- 指AbstractQueuedSynchronizer类。

    AQS是java中管理“锁”的抽象类,锁的许多公共方法都是在这个类中实现。AQS是独占锁(例如,ReentrantLock)和共享锁(例如,Semaphore)的公共父类。

 2. AQS锁的类别 -- 分为“独占锁”和“共享锁”两种。

    (01) 独占锁 -- 锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为“公平锁”和“非公平锁”。公平锁,是按照通过CLH等待线程按照先来先得的规则,公平的获取锁;而非公平锁,则当线程要获取锁时,它会无视CLH等待队列而直接获取锁。独占锁的典型实例子是ReentrantLock,此外,ReentrantReadWriteLock.WriteLock也是独占锁。

    (02) 共享锁 -- 能被多个线程同时拥有,能被共享的锁。JUC包中的ReentrantReadWriteLock.ReadLock,CyclicBarrier, CountDownLatch和Semaphore都是共享锁。

 3. CLH队列

    CLH队列是AQS中“等待锁”的线程队列。在多线程中,为了保护竞争资源不被多个线程同时操作而起来错误,我们常常需要通过锁来保护这些资源。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问;而其它线程则需要等待。CLH就是管理这些“等待锁”的线程的队列。

    CLH是一个非阻塞的 FIFO 队列。也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 CAS 保证节点插入和移除的原子性。

 4. CAS函数 -- Compare And Swap 

    CAS函数,是比较并交换函数,它是原子操作函数;即,通过CAS操作的数据都是以原子方式进行的。例如,compareAndSetHead(), compareAndSetTail(), compareAndSetNext()等函数。它们共同的特点是,这些函数所执行的动作是以原子的方式进行的。

 三、JUC-公平锁-获取锁

  获取锁

public void lock() {        //调用FairSync的lock方法        sync.lock();    }   //继承Sync   Sync继承AbstractQueuedSynchronizer类    static final class FairSync extends Sync {        private static final long serialVersionUID = -3000897897090466540L;        final void lock() {           //调用AbstractQueuedSynchronizer的acquire方法            acquire(1);        }        /**         * Fair version of tryAcquire.  Don't grant access unless         * recursive call or no waiters or is first.         */        protected final boolean tryAcquire(int acquires) {            final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                if (!hasQueuedPredecessors() &&                    compareAndSetState(0, acquires)) {                    setExclusiveOwnerThread(current);                    return true;                }            }            else if (current == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0)                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }    }
lock

 AQS 的acquire()实现

public final void acquire(int arg) {      //tryAcquire尝试获取锁       //addWaiter(Node.EXCLUSIVE), arg)如果失败新增等待节点      //acquireQueued根据队列获取锁        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();    }
aqs实现的acquire()

(01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。

(02) “当前线程”尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到"CLH队列(非阻塞的FIFO队列)"末尾。CLH队列就是线程等待队列。
(03) 再执行完addWaiter(Node.EXCLUSIVE)之后,会调用acquireQueued()来获取锁。 

FairSync.tryAcquire()

/**         * Fair version of tryAcquire.  Don't grant access unless         * recursive call or no waiters or is first.         */ protected final boolean tryAcquire(int acquires) {
// 获取“当前线程”    final Thread current = Thread.currentThread();    // 获取“独占锁”的状态    int c = getState();    // c=0意味着“锁没有被任何线程锁拥有”,    if (c == 0) {        // 若“锁没有被任何线程锁拥有”,        // 则判断“当前线程”是不是CLH队列中的第一个线程线程,        // 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”。        if (!hasQueuedPredecessors() &&            compareAndSetState(0, acquires)) {            setExclusiveOwnerThread(current);            return true;        }    }    else if (current == getExclusiveOwnerThread()) {        // 如果“独占锁”的拥有者已经为“当前线程”,        // 则将更新锁的状态。        int nextc = c + acquires;        if (nextc < 0)            throw new Error("Maximum lock count exceeded");        setState(nextc);        return true;    }    return false;}

 tryAcquire()的作用就是尝试去获取锁,尝试成功的话,返回true;尝试失败的话,返回false,后续再通过其它办法来获取该锁。

 

hasQueuedPredecessors()

public final boolean hasQueuedPredecessors() {    Node t = tail;     Node h = head;    Node s;    return h != t &&        ((s = h.next) == null || s.thread != Thread.currentThread());}

hasQueuedPredecessors() 是通过判断"当前线程"是不是在CLH队列的队首,来返回AQS中是不是有比“当前线程”等待更久的线程。

Node

private transient volatile Node head;    // CLH队列的队首private transient volatile Node tail;    // CLH队列的队尾// CLH队列的节点static final class Node {    static final Node SHARED = new Node();    static final Node EXCLUSIVE = null;    // 线程已被取消,对应的waitStatus的值    static final int CANCELLED =  1;    // “当前线程的后继线程需要被unpark(唤醒)”,对应的waitStatus的值。    // 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。    static final int SIGNAL    = -1;    // 线程(处在Condition休眠状态)在等待Condition唤醒,对应的waitStatus的值    static final int CONDITION = -2;    // (共享锁)其它线程获取到“共享锁”,对应的waitStatus的值    static final int PROPAGATE = -3;   
// waitStatus为“CANCELLED, SIGNAL, CONDITION, PROPAGATE”时分别表示不同状态,    // 若waitStatus=0,则意味着当前线程不属于上面的任何一种状态。    volatile int waitStatus;    // 前一节点    volatile Node prev;    // 后一节点    volatile Node next;    // 节点所对应的线程    volatile Thread thread;    // nextWaiter是“区别当前CLH队列是 ‘独占锁’队列 还是 ‘共享锁’队列 的标记”    // 若nextWaiter=SHARED,则CLH队列是“独占锁”队列;    // 若nextWaiter=EXCLUSIVE,(即nextWaiter=null),则CLH队列是“共享锁”队列。    Node nextWaiter;    // “共享锁”则返回true,“独占锁”则返回false。    final boolean isShared() {        return nextWaiter == SHARED;    }    // 返回前一节点    final Node predecessor() throws NullPointerException {        Node p = prev;        if (p == null)            throw new NullPointerException();        else            return p;    }    Node() {    // Used to establish initial head or SHARED marker    }    // 构造函数。thread是节点所对应的线程,mode是用来表示thread的锁是“独占锁”还是“共享锁”。    Node(Thread thread, Node mode) {     // Used by addWaiter        this.nextWaiter = mode;        this.thread = thread;    }    // 构造函数。thread是节点所对应的线程,waitStatus是线程的等待状态。    Node(Thread thread, int waitStatus) { // Used by Condition        this.waitStatus = waitStatus;        this.thread = thread;    }}

addWaiter()

private Node addWaiter(Node mode) {    // 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。    Node node = new Node(Thread.currentThread(), mode);    Node pred = tail;    // 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }    // 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。    enq(node);    return node;}

addWaiter(Node.EXCLUSIVE)会首先创建一个Node节点,节点的类型是“独占锁”(Node.EXCLUSIVE)类型。然后,再将该节点添加到CLH队列的末尾。

final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        // interrupted表示在CLH队列的调度中,        // “当前线程”在休眠时,有没有被中断过。        boolean interrupted = false;        for (;;) {            // 获取上一个节点。            // node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。            final Node p = node.predecessor();            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}

acquireQueued()的目的是从队列中获取锁

shouldParkAfterFailedAcquire

// 返回“当前线程是否应该阻塞”private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    // 前继节点的状态    int ws = pred.waitStatus;    // 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true。    if (ws == Node.SIGNAL)        return true;    // 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点”  为  “‘原前继节点’的前继节点”。    if (ws > 0) {        do {            node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        pred.next = node;    } else {        // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}

 

如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。

如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。

selfInterrupt

 

private static void selfInterrupt() {    Thread.currentThread().interrupt();}

 

如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt();否则不会执行。

在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!

四、JUC-公平锁-释放锁 

unlock()

public void unlock() {    sync.release(1);}

“1”的含义和“获取锁的函数acquire(1)的含义”一样,它是设置“释放锁的状态”的参数。由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1。

release()

public final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}

release()会先调用tryRelease()来尝试释放当前线程锁持有的锁。成功的话,则唤醒后继等待线程,并返回true。否则,直接返回false。

protected final boolean tryRelease(int releases) {    // c是本次释放锁之后的状态    int c = getState() - releases;    // 如果“当前线程”不是“锁的持有者”,则抛出异常!    if (Thread.currentThread() != getExclusiveOwnerThread())        throw new IllegalMonitorStateException();    boolean free = false;    // 如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。    if (c == 0) {        free = true;        setExclusiveOwnerThread(null);    }    // 设置当前线程的锁的状态。    setState(c);    return free;}

tryRelease()的作用是尝试释放锁。

(01) 如果“当前线程”不是“锁的持有者”,则抛出异常。
(02) 如果“当前线程”在本次释放锁操作之后,对锁的拥有状态是0(即,当前线程彻底释放该“锁”),则设置“锁”的持有者为null,即锁是可获取状态。同时,更新当前线程的锁的状态为0。

unparkSuccessor()

 

private void unparkSuccessor(Node node) {    // 获取当前线程的状态    int ws = node.waitStatus;    // 如果状态<0,则设置状态=0    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    //获取当前节点的“有效的后继节点”,无效的话,则通过for循环进行获取。    // 这里的有效,是指“后继节点对应的线程状态<=0”    Node s = node.next;    if (s == null || s.waitStatus > 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }    // 唤醒“后继节点对应的线程”    if (s != null)        LockSupport.unpark(s.thread);}

 

在release()中“当前线程”释放锁成功的话,会唤醒当前线程的后继线程。

根据CLH队列的FIFO规则,“当前线程”(即已经获取锁的线程)肯定是head;如果CLH队列非空的话,则唤醒锁的下一个等待线程。

 

 

 

 

 

 

转载于:https://www.cnblogs.com/guoliangxie/p/6697855.html

你可能感兴趣的文章
java操作mongodb(连接池)(转)
查看>>
Android入门(十一)SQLite CURD
查看>>
一个双线程下同一时候操作指针变量导致野指针出现的问题总结
查看>>
Servlet过滤器和监听器
查看>>
js闭包应用
查看>>
AndroidStudio
查看>>
10-10-归并排序-内部排序-第10章-《数据结构》课本源码-严蔚敏吴伟民版
查看>>
JAVA的节点流和处理流
查看>>
jQuery如何退出each循环的?
查看>>
细说JavaScript对象(2):原型对象
查看>>
Sql server在另一台服务器,在Visual Studio 中没问题,IIS中 提示“在与 SQL Server 建立连接时出现与网络相关的或特定于实例的错误。。。。”...
查看>>
Wordpress 所有hoor列表
查看>>
搭建windows的solr6服务器
查看>>
Linux命令(20)linux服务器之间复制文件和目录
查看>>
你知道C#中的Lambda表达式的演化过程吗?
查看>>
聊一聊PV和并发
查看>>
ASCII码表
查看>>
Maven的作用总结
查看>>
設置Linux保留物理內存並使用 (1)
查看>>
Android画一条横线
查看>>