Tools&CountDownLatch&Semaphore原理与应用
# Semaphore
# 1. Semaphore是什么?
Semaphore字面意思是信号量的意思,它的作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,是在生产当中比较常见的一个工具类。
# 2. 怎么使用Semaphore?
# 2.1 构造方法
public Semaphore(int permits)
public Semaphore(int permits, boolean fair)
2
- premits:表示许可线程的数量
- fair 表示公平性,如果这个设为true的话,下次执行的线程会是等待最久的线程
# 2.2 重要方法
public void acquire() throws InterruptedException
public void release()
tryAcquire(int args,long timeout, TimeUnit unit)
2
3
- acquire()表示阻塞并获取许可
- rlease()表示释放许可
# 2.3 基本使用
# 2.3.1 需求场景
资源访问, 服务限流(Hystrix里限流就有基于信号量方式)。
# 2.3.2 代码实现
public class SemaphoreRunner {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i=0;i<10;i++){
new Thread(new Task(semaphore,"yangguo+"+i)).start();
}
}
static class Task extends Thread{
Semaphore semaphore;
public Task(Semaphore semaphore,String tname){
super(tname);
this.semaphore = semaphore;
//this.setName(tname);
}
public void run() {
try {
//semaphore.acquireUninterruptibly();
semaphore.acquire();//获取公共资源
System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
Thread.sleep(5000);
semaphore.release();
/*if(semaphore.tryAcquire(500,TimeUnit.MILLISECONDS)){
System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
Thread.sleep(5000);
semaphore.release();//释放公共资源
}else{
fallback();
}*/
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void fallback(){
System.out.println("降级");
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
打印结果:
Thread-3:aquire() at time:1563096128901
Thread-1:aquire() at time:1563096128901
Thread-1:aquire() at time:1563096129903
Thread-7:aquire() at time:1563096129903
Thread-5:aquire() at time:1563096129903
Thread-3:aquire() at time:1563096129903
Thread-7:aquire() at time:1563096130903
Thread-5:aquire() at time:1563096130903
Thread-9:aquire() at time:1563096130903
Thread-9:aquire() at time:1563096131903
2
3
4
5
6
7
8
9
10
从打印结果可以看出,一次只有两个线程执行acquire(),只有线程进行release()方法后才会有别的线程执行acquire()。
# CountDownLatch
# CountDownLatch是什么?
CountDownLatch是一个非常实用的控制工具类,称之为“倒计时器”,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。
CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行,例如,应用程序的主线程希望在启动框架服务的线程已经启动所有的框架服务之后在执行。
使用场景:
zookeeper分布式锁,Jmeter模拟高并发等
# CountDownLatch如何工作?
CountDownLatch是通过一个计数器来实现的,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。
![在这里插入图片描述](http://blogimg.gkmall.top/img/classless/202206212002422.png)
API
CountDownLatch.countDown()
CountDownLatch.await();
2
特点 只能一次性使用(不能reset);主线程阻塞;某个线程中断将永远到不了屏障点,所有线程都会一直等待。
CountDownLatch应用场景例子
例子1
比如陪媳妇去看病。
医院里边排队的人很多,如果一个人的话,要先看大夫,看完大夫再去排队交钱取药。
现在我们是双核,可以同时做这两个事(多线程)。
假设看大夫花3秒钟,排队交费取药花5秒钟。我们同时搞的话,5秒钟我们就能完成,然后一起回家(回到主线程)。
代码如下:
/**
* 看大夫任务
*/
public class SeeDoctorTask implements Runnable {
private CountDownLatch countDownLatch;
public SeeDoctorTask(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
public void run() {
try {
System.out.println("开始看医生");
Thread.sleep(3000);
System.out.println("看医生结束,准备离开病房");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if (countDownLatch != null)
countDownLatch.countDown();
}
}
}
/**
* 排队的任务
*/
public class QueueTask implements Runnable {
private CountDownLatch countDownLatch;
public QueueTask(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
public void run() {
try {
System.out.println("开始在医院药房排队买药....");
Thread.sleep(5000);
System.out.println("排队成功,可以开始缴费买药");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if (countDownLatch != null)
countDownLatch.countDown();
}
}
}
/**
* 配媳妇去看病,轮到媳妇看大夫时
* 我就开始去排队准备交钱了。
*/
public class CountDownLaunchRunner {
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(new SeeDoctorTask(countDownLatch)).start();
new Thread(new QueueTask(countDownLatch)).start();
//等待线程池中的2个任务执行完毕,否则一直
countDownLatch.await();
System.out.println("over,回家 cost:"+(System.currentTimeMillis()-now));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
例子2
//创建初始化3个线程的线程池
private ExecutorService threadPool = Executors.newFixedThreadPool(3);
//保存每个学生的平均成绩
private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
private CountDownLatch countDownLatch = new CountDownLatch(3);
private void count() {
for (int i = 0; i < 3; i++) {
threadPool.execute(() -> {
//计算每个学生的平均成绩,代码略()假设为60~100的随机数
int score = (int) (Math.random() * 40 + 60);
try {
Thread.sleep(Math.round(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName() + "同学的平均成绩为" + score);
countDownLatch.countDown();
});
}
this.run();
threadPool.shutdown();
}
@Override
public void run() {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = 0;
Set<String> set = map.keySet();
for (String s : set) {
result += map.get(s);
}
System.out.println("三人平均成绩为:" + (result / 3) + "分");
}
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
CyclicBarrier1 cb = new CyclicBarrier1();
cb.count();
Thread.sleep(100);
long end = System.currentTimeMillis();
System.out.println(end - now);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
最终输出结果:
其中1194ms证明了会阻塞主线程。
# CyclicBarrier
栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以重新使用的(reset()方法重置屏障点),这一点与CountDownLatch不同。
CyclicBarrier是一种同步机制允许一组线程相互等待,等到所有线程都到达一个屏障点才退出await方法,它没有直接实现AQS而是借助ReentrantLock来实现的同步机制。它是可循环使用的,而CountDownLatch是一次性的,另外它体现的语义也跟CountDownLatch不同,CountDownLatch减少计数到达条件采用的是release方式,而CyclicBarrier走向屏障点(await)采用的是Acquire方式,Acquire是会阻塞的,这也实现了CyclicBarrier的另外一个特点,只要有一个线程中断那么屏障点就被打破,所有线程都将被唤醒(CyclicBarrier自己负责这部分实现,不是由AQS调度的),这样也避免了因为一个线程中断引起永远不能到达屏障点而导致其他线程一直等待。屏障点被打破的CyclicBarrier将不可再使用(会抛出BrokenBarrierException)除非执行reset操作。
构造函数
CyclicBarrier(int parties);//int类型的参数表示有几个线程来参与这个屏障拦截,(拿上面的例子,即有几个人跟团旅游);
CyclicBarrier(int parties,Runnable barrierAction);//当所有线程到达一个屏障点时,优先执行barrierAction这个线程。
2
API
cyclicBarrier.await();
应用场景
例子1
可以用于多线程计算数据,最后合并计算结果的场景。例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。
示例代码:
public class CyclicBarrierRunner implements Runnable {
private CyclicBarrier cyclicBarrier;
private int index ;
public CyclicBarrierRunner(CyclicBarrier cyclicBarrier, int index) {
this.cyclicBarrier = cyclicBarrier;
this.index = index;
}
public void run() {
try {
System.out.println("index: " + index);
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
public void run() {
System.out.println("所有特工到达屏障,准备开始执行秘密任务");
}
});
for (int i = 0; i < 10; i++) {
new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start();
}
cyclicBarrier.await();
System.out.println("全部到达屏障....1");
Thread.sleep(5000);
for (int i = 0; i < 10; i++) {
new Thread(new CyclicBarrierRunner(cyclicBarrier, i)).start();
}
cyclicBarrier.await();
System.out.println("全部到达屏障....1");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
例子2
//创建初始化3个线程的线程池
private ExecutorService threadPool = Executors.newFixedThreadPool(3);
//创建3个CyclicBarrier对象,执行完后执行当前类的run方法
private CyclicBarrier cb = new CyclicBarrier(3, this);
//保存每个学生的平均成绩
private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
private void count() {
for (int i = 0; i < 3; i++) {
threadPool.execute(() -> {
//计算每个学生的平均成绩,代码略()假设为60~100的随机数
int score = (int) (Math.random() * 40 + 60);
try {
Thread.sleep(Math.round(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(Thread.currentThread().getName(), score);
System.out.println(Thread.currentThread().getName() + "同学的平均成绩为" + score);
try {
//执行完运行await(),等待所有学生平均成绩都计算完毕
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
threadPool.shutdown();
}
@Override
public void run() {
int result = 0;
Set<String> set = map.keySet();
for (String s : set) {
result += map.get(s);
}
System.out.println("三人平均成绩为:" + (result / 3) + "分");
}
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
CyclicBarrier1 cb = new CyclicBarrier1();
cb.count();
Thread.sleep(100);
long end = System.currentTimeMillis();
System.out.println(end - now);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
输出结果
# CyclicBarrier和CountDownLatch的用法与区别
CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。
CountDownLatch会阻塞主线程,CyclicBarrier不会阻塞主线程,只会阻塞子线程。某线程中断CyclicBarrier会抛出异常,避免了所有线程无限等待。
我们来从jdk作者设计的目的来看,javadoc是这么描述它们的:
CountDownLatch:
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
CyclicBarrier:
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point
2
3
4
5
从javadoc的描述可以得出:
CountDownLatch:一个或者多个线程,等待其他多个线程完成某件事情之后才能执行(主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有游客到齐才能去下一个景点);
CyclicBarrier:多个线程互相等待,直到到达同一个同步点,再继续一起执行(是一组线程之间的相互等待,可以类比几个驴友之间的不离不弃,共同到达某个地方,再继续出发,这样反复)。
对于CountDownLatch来说,重点是“一个线程(多个线程)等待”,而其他的N个线程在完成“某件事情”之后,可以终止,也可以等待。而对于CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。
CountDownLatch是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而CyclicBarrier更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。
# Exchanger
当一个线程运行到exchange()方法时会阻塞,另一个线程运行到exchange()时,二者交换数据,然后执行后面的程序。
应用场景极少,大家了解即可
public class ExchangerTest {
public static void main(String []args) {
final Exchanger<Integer> exchanger = new Exchanger<Integer>();
for(int i = 0 ; i < 10 ; i++) {
final Integer num = i;
new Thread() {
public void run() {
System.out.println("我是线程:Thread_" + this.getName() + "我的数据是:" + num);
try {
Integer exchangeNum = exchanger.exchange(num);
Thread.sleep(1000);
System.out.println("我是线程:Thread_" + this.getName() + "我原先的数据为:" + num + " , 交换后的数据为:" + exchangeNum);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}.start();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21