Kevin's blog Kevin's blog
首页
  • AI基础
  • RAG技术
  • 提示词工程
  • Wireshark抓包
  • 常见问题
  • 数据库
  • 代码技巧
  • 浏览器
  • 手册教程
  • 技术应用
  • 流程规范
  • github技巧
  • git笔记
  • vpn笔记
  • 知识概念
  • 学习笔记
  • 环境搭建
  • linux&运维
  • 微服务
  • 经验技巧
  • 实用手册
  • arthas常用
  • spring应用
  • javaAgent技术
  • 网站
友情链接
  • 分类
  • 标签
  • 归档

Kevin

你可以迷茫,但不可以虚度
首页
  • AI基础
  • RAG技术
  • 提示词工程
  • Wireshark抓包
  • 常见问题
  • 数据库
  • 代码技巧
  • 浏览器
  • 手册教程
  • 技术应用
  • 流程规范
  • github技巧
  • git笔记
  • vpn笔记
  • 知识概念
  • 学习笔记
  • 环境搭建
  • linux&运维
  • 微服务
  • 经验技巧
  • 实用手册
  • arthas常用
  • spring应用
  • javaAgent技术
  • 网站
友情链接
  • 分类
  • 标签
  • 归档
  • JVM性能调优

  • 并发编程

    • 📚多线程并发编程JUC知识点
    • JMM模型和volatile关键字
    • CPU缓存一致性协议MESI
    • JVM内置锁synchronized详解
    • 抽象队列同步器AQS应用Lock详解
      • ReentrantLock
        • ReentrantLock如何实现synchronized不具备的公平与非公平性呢?
      • AQS具备特性
        • AQS内部维护属性
        • State三种访问方式
        • AQS定义两种资源共享方式
        • AQS定义两种队列
        • 主要实现以下几种方法
        • isHeldExclusively():
        • tryAcquire(int)
        • tryRelease(int)
        • tryAcquireShared(int)
        • tryReleaseShared(int)
      • 同步等待队列
      • 条件等待队列
      • ReentrantLock源码剖析
        • 一、AQS
        • 二、加锁
        • 一、从lock进入
        • 二、从非公平锁进去
        • 三、查看acquire方法
        • 四、查看tryAcquire方法
        • 五、查看addWaiter(Node.EXCLUSIVE)
        • 六、查看acquireQueued方法
      • AQS相关文章
    • 阻塞队列BlockingQueue详解
    • Tools&CountDownLatch&Semaphore原理与应用
    • Atomic&Unsafe魔法类详解
    • HashMap详解
    • Executor线程池原理与源码解读
  • MySql

  • spring

  • redis

  • zookeeper

  • rabbitMQ

  • 架构

  • 锁

  • 分库分表

  • 学习笔记
  • 并发编程
kevin
2022-05-19
目录

抽象队列同步器AQS应用Lock详解

# AQS应用之Lock

Java并发编程核心在于java.concurrent.util包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。

# ReentrantLock

ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步 手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。而且它具有比synchronized更多的特性,比如它支持手动加锁与解锁,支持加锁的公平性。

使用ReentrantLock进行同步
ReentrantLock lock = new ReentrantLock(false);//false为非公平锁, true为公平锁
lock.lock() //加锁
lock.unlock() //解锁
1
2
3
4

# ReentrantLock如何实现synchronized不具备的公平与非公平性呢?

在ReentrantLock内部定义了一个Sync的内部类,该类继承AbstractQueuedSynchronized,对该抽象类的部分方法做了实现;并且还定义了两个子类:

  1. FairSync 公平锁的实现
  2. NonfairSync 非公平锁的实现

这两个类都继承自Sync,也就是间接继承了AbstractQueuedSynchronized,所以这一个ReentrantLock同时具备公平与非公平特性。

上面主要涉及的设计模式:模板模式-子类根据需要做具体业务实现

# AQS具备特性

  1. 阻塞等待队列
  2. 共享/独占
  3. 公平/非公平
  4. 可重入
  5. 允许中断
  • 除了Lock外,Java.concurrent.util当中同步器的实现如Latch,Barrier,BlockingQueue等,都是基于AQS框架实现
  1. 一般通过定义内部类Sync继承AQS
  2. 将同步器所有调用都映射到Sync对应的方法

# AQS内部维护属性volatile int state (32位)

state表示资源的可用状态

# State三种访问方式

getState()、setState()、compareAndSetState()

# AQS定义两种资源共享方式

Exclusive-独占:只有一个线程能执行,如ReentrantLock

Share-共享:多个线程可以同时执行,如Semaphore/CountDownLatch

# AQS定义两种队列

  1. 同步等待队列
  2. 条件等待队列

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/ 唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时

# 主要实现以下几种方法

# isHeldExclusively():

该线程是否正在独占资源。只有用到condition才需要去 实现它。

# tryAcquire(int)

独占方式。尝试获取资源,成功则返回true,失败则返回 false。

# tryRelease(int)

独占方式。尝试释放资源,成功则返回true,失败则返回 false。

# tryAcquireShared(int)

共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

# tryReleaseShared(int)

共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

# 同步等待队列

AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。

同步等待队列

# 条件等待队列

Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁

条件等待队列

# ReentrantLock源码剖析

volatile,CAS,队列问题。

# 一、AQS

AQS本质就是Java中的AbstractQueuedSynchronizer

在AbstractQueuedSynchronizer类中,有几个属性和一个双向队列(CLH队列)

AQS就是并发包下的一个基类

CountDownLatch,ReentrantLock,信号量……

static final class Node {
  
    // 排他锁的标识
    static final Node EXCLUSIVE = null;

    // 如果带有这个标识,证明是失效了~~
    static final int CANCELLED =  1;
  
	// 具有这个标识,说明后继节点需要被唤醒
    static final int SIGNAL    = -1;
   
	// Node对象存储标识的地方
    volatile int waitStatus;

	// 指向上一个节点
    volatile Node prev;

	// 指向下一个节点
    volatile Node next;

    // 当前Node绑定的线程
    volatile Thread thread;

  	// 返回前驱节点,如果前驱节点为null,抛出NPE
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

16468237960693011732ffy

ReentrantLock可以实现公平锁也可以实现非公平锁,也是互斥锁以及是可重入锁

# 二、加锁

# 一、从lock进入

public void lock() {
	// sync分为了公平和非公平
    sync.lock();
}
1
2
3
4

# 二、从非公平锁进去

final void lock() {
	// 通过CAS的方式尝试将state从0修改为1,如果返回true,代表修改成功,如果修改失败,返回false
    if (compareAndSetState(0, 1)){
		// 将一个属性设置为当前线程,这个属性是AQS的父类提供的
        setExclusiveOwnerThread(Thread.currentThread());
	} else {
        acquire(1);
	}
}
1
2
3
4
5
6
7
8
9

# 三、查看acquire方法

public final void acquire(int arg) {
	// tryAcquire再次尝试获取锁资源,如果尝试成功,返回true
    if (!tryAcquire(arg) &&
		// 获取锁资源失败后,需要将当前线程封装成一个Node,追加到AQS的队列中
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		// 线程中断
        selfInterrupt();
}
1
2
3
4
5
6
7
8

# 四、查看tryAcquire方法

final boolean nonfairTryAcquire(int acquires) {
	// 获取当前线程
    final Thread current = Thread.currentThread();
	// 获取AQS的state的值
    int c = getState();
	// 如果state为0,代表,尝试再次获取锁资源
    if (c == 0) {
		// CAS尝试修改state,从0-1,如果成功,设置ExclusiveOwnerThread属性为当前线程
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
	// 当前占有锁资源的线程是否是当前线程
    else if (current == getExclusiveOwnerThread()) {
		// 将state + 1
        int nextc = c + acquires;
		// 如果加1后,小于0,超所锁可重入的最大值,抛出Error
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
		// 没问题,就重新对state进行复制
        setState(nextc);
		// 锁重入成功
        return true;
    }
    return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 五、查看addWaiter(Node.EXCLUSIVE)

// 说明前面获取锁资源失败,放到队列中等待。。。
private Node addWaiter(Node mode) {
	// 创建Node类,并且设置thread为当前线程,设置为排它锁
    Node node = new Node(Thread.currentThread(), mode);
  	// 获取AQS中队列的尾部节点
    Node pred = tail;
	// 如果tail != null,现在队列有人排队
    if (pred != null) {
		// 将当前节点的prev,设置为刚才的尾部节点
        node.prev = pred;
		// 基于CAS的方式,将tail节点设置为当前节点
        if (compareAndSetTail(pred, node)) {
			// 将之前的为节点的next,设置为当前节点
            pred.next = node;
            return node;
        }
    }
	// 查看下面~
    enq(node);
    return node;
}

//-------------------------------
// 现在没人排队,我是第一个~~,  如果前面CAS失败,也会进到这个位置重新往队列尾巴塞。
private Node enq(final Node node) {
	// 死循环~~
    for (;;) {
		// 重新获取当前的tail节点为t
        Node t = tail;
        if (t == null) { 
			// 现在没人排队, 我是第一个,没头没尾,都是空
            if (compareAndSetHead(new Node()))	// 初始化一个Node作为head,而这个head没有意义。
				// 将头尾都指向了这个初始化的Node
                tail = head;
        } else {
			// 有人排队,往队列尾巴塞
			// 当前节点的上一个指向tail。
            node.prev = t;
			// 基于CAS的方式,将tail节点设置为当前节点
            if (compareAndSetTail(t, node)) {   
				// 将之前的为节点的next,设置为当前节点
                t.next = node;
                return t;
            }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

# 六、查看acquireQueued方法

// 已经将node加入到了双向队列中,然后执行当前方法
final boolean acquireQueued(final Node node, int arg) {
	// 标识!!!!
    boolean failed = true;
    try {
		// 标识!!!!
        boolean interrupted = false;
        for (;;) {
			// 获取当前节点的上一个节点p
            final Node p = node.predecessor();
			// 如果p是头,再次尝试获取锁资源(state从0-1,锁重入操作),成功返回true,失败返回false
            if (p == head && tryAcquire(arg)) {
				// 拿到锁资源设置head节点为当前节点,将thread,prev设置为null,因为拿到锁资源了,排队跟我没关系
                setHead(node);
                p.next = null;   // 帮助GC回收
                failed = false;  // 将标识修改为false
                return interrupted;  // 返回interrupted 
            }
			// 保证上一个节点是-1,才会返回true,才会将线程阻塞,等待唤醒获取锁资源
            if (shouldParkAfterFailedAcquire(p, node) &&
				// 基于Unsafe类的park方法,挂起线程~~~
                parkAndCheckInterrupt())    // 针对fail属性,这里是唯一可能出现异常的地方,JVM内部出现问题时,可以这么理解,fianlly代码块中的内容,执行的几率约等于0~
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// -------------------------------------
// node是当前节点,pred是上一个节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	// 获取上一个节点的状态
    int ws = pred.waitStatus;
	// 如果上一个节点状态为SIGNAL,一切正常!
    if (ws == Node.SIGNAL)
        return true;
	// 如果上一个节点已经失效了
    if (ws > 0) {
        do {
			// 将当前节点的prev指针指向了上一个的上一个!
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);  // 一致找到小于等于0的
		// 将重新标识好的最近的有效节点的next
        pred.next = node;
    } else {
		// 小于等于0,不等于-1,将上一个有效节点状态修改为-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

// ----------------------------
// cancelAcquire方法
private void cancelAcquire(Node node) {
	// 如果当前节点为null,结束,健壮性判断
    if (node == null)
        return;
	// node不为null的前提下执行

	// 将当前node的线程置位null  , 竞争锁资源跟我没有关系了,
    node.thread = null;

	// 获取当前节点的前驱节点
    Node pred = node.prev;
	// 前驱节点的状态 > 0
    while (pred.waitStatus > 0)
		// 找到前驱中最近的非失效节点
        node.prev = pred = pred.prev;

	// 将第一个不是失效节点的后继节点声明出来
    Node predNext = pred.next;

	// 将当前节点置位失效节点。给别的Node看的。
    node.waitStatus = Node.CANCELLED;

	// 如果当前节点是尾节点,将尾节点设置为最近的有效节点(如果当前节点为尾节点的操作)
    if (node == tail && compareAndSetTail(node, pred)) {
		// 用CAS方式将尾节点的next设置null
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
		// 中间节点操作
		// 如果上一个节点不是头节点
        if (pred != head &&
			获取上一届点状态,是不是有效
            ((ws = pred.waitStatus) == Node.SIGNAL ||  // pred需要唤醒后继节点的
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);  // 尝试将pred的前驱节点的next指向当前节点的next(必须是有效的next节点)
        } else {
			// 头结点,唤醒后继节点
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101

# AQS相关文章

从ReentrantLock的实现看AQS的原理及应用 (opens new window)

深入理解AbstractQueuedSynchronizer (opens new window)

上次更新: 2022/07/15, 15:30:44
JVM内置锁synchronized详解
阻塞队列BlockingQueue详解

← JVM内置锁synchronized详解 阻塞队列BlockingQueue详解→

最近更新
01
AI是如何学习的
06-06
02
提示词工程实践指南
06-06
03
chatGpt提示原则
06-06
更多文章>
| Copyright © 2022-2025 Kevin | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式