抽象队列同步器AQS应用Lock详解
# AQS应用之Lock
Java并发编程核心在于java.concurrent.util包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于
AbstractQueuedSynchronizer
简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。
# ReentrantLock
ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步 手段,它的功能类似于synchronized是一种互斥锁,可以保证线程安全。而且它具有比
synchronized
更多的特性,比如它支持手动加锁与解锁,支持加锁的公平性。
使用ReentrantLock进行同步
ReentrantLock lock = new ReentrantLock(false);//false为非公平锁, true为公平锁
lock.lock() //加锁
lock.unlock() //解锁
2
3
4
# ReentrantLock如何实现synchronized不具备的公平与非公平性呢?
在ReentrantLock内部定义了一个Sync的内部类,该类继承AbstractQueuedSynchronized,对该抽象类的部分方法做了实现;并且还定义了两个子类:
FairSync
公平锁的实现NonfairSync
非公平锁的实现
这两个类都继承自Sync
,也就是间接继承了AbstractQueuedSynchronized,所以这一个ReentrantLock同时具备公平与非公平特性。
上面主要涉及的设计模式:模板模式-子类根据需要做具体业务实现
# AQS具备特性
阻塞等待队列
共享/独占
公平/非公平
可重入
允许中断
- 除了Lock外,Java.concurrent.util当中同步器的实现如
Latch
,Barrier
,BlockingQueue
等,都是基于AQS框架实现
- 一般通过定义内部类Sync继承AQS
- 将同步器所有调用都映射到Sync对应的方法
# AQS内部维护属性volatile int state (32位)
state
表示资源的可用状态
# State三种访问方式
getState()
、setState()
、compareAndSetState()
# AQS定义两种资源共享方式
Exclusive-独占
:只有一个线程能执行,如ReentrantLock
Share-共享
:多个线程可以同时执行,如Semaphore
/CountDownLatch
# AQS定义两种队列
- 同步等待队列
- 条件等待队列
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/ 唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时
# 主要实现以下几种方法
# isHeldExclusively():
该线程是否正在独占资源。只有用到condition才需要去 实现它。
# tryAcquire(int)
独占方式。尝试获取资源,成功则返回true,失败则返回 false。
# tryRelease(int)
独占方式。尝试释放资源,成功则返回true,失败则返回 false。
# tryAcquireShared(int)
共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
# tryReleaseShared(int)
共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
# 同步等待队列
AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
# 条件等待队列
Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁
# ReentrantLock源码剖析
volatile,CAS,队列问题。
# 一、AQS
AQS本质就是Java中的AbstractQueuedSynchronizer
在AbstractQueuedSynchronizer类中,有几个属性和一个双向队列(CLH队列)
AQS就是并发包下的一个基类
CountDownLatch,ReentrantLock,信号量……
static final class Node {
// 排他锁的标识
static final Node EXCLUSIVE = null;
// 如果带有这个标识,证明是失效了~~
static final int CANCELLED = 1;
// 具有这个标识,说明后继节点需要被唤醒
static final int SIGNAL = -1;
// Node对象存储标识的地方
volatile int waitStatus;
// 指向上一个节点
volatile Node prev;
// 指向下一个节点
volatile Node next;
// 当前Node绑定的线程
volatile Thread thread;
// 返回前驱节点,如果前驱节点为null,抛出NPE
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
ReentrantLock可以实现公平锁也可以实现非公平锁,也是互斥锁以及是可重入锁
# 二、加锁
# 一、从lock进入
public void lock() {
// sync分为了公平和非公平
sync.lock();
}
2
3
4
# 二、从非公平锁进去
final void lock() {
// 通过CAS的方式尝试将state从0修改为1,如果返回true,代表修改成功,如果修改失败,返回false
if (compareAndSetState(0, 1)){
// 将一个属性设置为当前线程,这个属性是AQS的父类提供的
setExclusiveOwnerThread(Thread.currentThread());
} else {
acquire(1);
}
}
2
3
4
5
6
7
8
9
# 三、查看acquire方法
public final void acquire(int arg) {
// tryAcquire再次尝试获取锁资源,如果尝试成功,返回true
if (!tryAcquire(arg) &&
// 获取锁资源失败后,需要将当前线程封装成一个Node,追加到AQS的队列中
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 线程中断
selfInterrupt();
}
2
3
4
5
6
7
8
# 四、查看tryAcquire方法
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取AQS的state的值
int c = getState();
// 如果state为0,代表,尝试再次获取锁资源
if (c == 0) {
// CAS尝试修改state,从0-1,如果成功,设置ExclusiveOwnerThread属性为当前线程
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前占有锁资源的线程是否是当前线程
else if (current == getExclusiveOwnerThread()) {
// 将state + 1
int nextc = c + acquires;
// 如果加1后,小于0,超所锁可重入的最大值,抛出Error
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 没问题,就重新对state进行复制
setState(nextc);
// 锁重入成功
return true;
}
return false;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 五、查看addWaiter(Node.EXCLUSIVE)
// 说明前面获取锁资源失败,放到队列中等待。。。
private Node addWaiter(Node mode) {
// 创建Node类,并且设置thread为当前线程,设置为排它锁
Node node = new Node(Thread.currentThread(), mode);
// 获取AQS中队列的尾部节点
Node pred = tail;
// 如果tail != null,现在队列有人排队
if (pred != null) {
// 将当前节点的prev,设置为刚才的尾部节点
node.prev = pred;
// 基于CAS的方式,将tail节点设置为当前节点
if (compareAndSetTail(pred, node)) {
// 将之前的为节点的next,设置为当前节点
pred.next = node;
return node;
}
}
// 查看下面~
enq(node);
return node;
}
//-------------------------------
// 现在没人排队,我是第一个~~, 如果前面CAS失败,也会进到这个位置重新往队列尾巴塞。
private Node enq(final Node node) {
// 死循环~~
for (;;) {
// 重新获取当前的tail节点为t
Node t = tail;
if (t == null) {
// 现在没人排队, 我是第一个,没头没尾,都是空
if (compareAndSetHead(new Node())) // 初始化一个Node作为head,而这个head没有意义。
// 将头尾都指向了这个初始化的Node
tail = head;
} else {
// 有人排队,往队列尾巴塞
// 当前节点的上一个指向tail。
node.prev = t;
// 基于CAS的方式,将tail节点设置为当前节点
if (compareAndSetTail(t, node)) {
// 将之前的为节点的next,设置为当前节点
t.next = node;
return t;
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 六、查看acquireQueued方法
// 已经将node加入到了双向队列中,然后执行当前方法
final boolean acquireQueued(final Node node, int arg) {
// 标识!!!!
boolean failed = true;
try {
// 标识!!!!
boolean interrupted = false;
for (;;) {
// 获取当前节点的上一个节点p
final Node p = node.predecessor();
// 如果p是头,再次尝试获取锁资源(state从0-1,锁重入操作),成功返回true,失败返回false
if (p == head && tryAcquire(arg)) {
// 拿到锁资源设置head节点为当前节点,将thread,prev设置为null,因为拿到锁资源了,排队跟我没关系
setHead(node);
p.next = null; // 帮助GC回收
failed = false; // 将标识修改为false
return interrupted; // 返回interrupted
}
// 保证上一个节点是-1,才会返回true,才会将线程阻塞,等待唤醒获取锁资源
if (shouldParkAfterFailedAcquire(p, node) &&
// 基于Unsafe类的park方法,挂起线程~~~
parkAndCheckInterrupt()) // 针对fail属性,这里是唯一可能出现异常的地方,JVM内部出现问题时,可以这么理解,fianlly代码块中的内容,执行的几率约等于0~
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// -------------------------------------
// node是当前节点,pred是上一个节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取上一个节点的状态
int ws = pred.waitStatus;
// 如果上一个节点状态为SIGNAL,一切正常!
if (ws == Node.SIGNAL)
return true;
// 如果上一个节点已经失效了
if (ws > 0) {
do {
// 将当前节点的prev指针指向了上一个的上一个!
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0); // 一致找到小于等于0的
// 将重新标识好的最近的有效节点的next
pred.next = node;
} else {
// 小于等于0,不等于-1,将上一个有效节点状态修改为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// ----------------------------
// cancelAcquire方法
private void cancelAcquire(Node node) {
// 如果当前节点为null,结束,健壮性判断
if (node == null)
return;
// node不为null的前提下执行
// 将当前node的线程置位null , 竞争锁资源跟我没有关系了,
node.thread = null;
// 获取当前节点的前驱节点
Node pred = node.prev;
// 前驱节点的状态 > 0
while (pred.waitStatus > 0)
// 找到前驱中最近的非失效节点
node.prev = pred = pred.prev;
// 将第一个不是失效节点的后继节点声明出来
Node predNext = pred.next;
// 将当前节点置位失效节点。给别的Node看的。
node.waitStatus = Node.CANCELLED;
// 如果当前节点是尾节点,将尾节点设置为最近的有效节点(如果当前节点为尾节点的操作)
if (node == tail && compareAndSetTail(node, pred)) {
// 用CAS方式将尾节点的next设置null
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 中间节点操作
// 如果上一个节点不是头节点
if (pred != head &&
获取上一届点状态,是不是有效
((ws = pred.waitStatus) == Node.SIGNAL || // pred需要唤醒后继节点的
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next); // 尝试将pred的前驱节点的next指向当前节点的next(必须是有效的next节点)
} else {
// 头结点,唤醒后继节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101