Kevin's blog Kevin's blog
首页
  • AI基础
  • RAG技术
  • 提示词工程
  • Wireshark抓包
  • 常见问题
  • 数据库
  • 代码技巧
  • 浏览器
  • 手册教程
  • 技术应用
  • 流程规范
  • github技巧
  • git笔记
  • vpn笔记
  • 知识概念
  • 学习笔记
  • 环境搭建
  • linux&运维
  • 微服务
  • 经验技巧
  • 实用手册
  • arthas常用
  • spring应用
  • javaAgent技术
  • 网站
友情链接
  • 分类
  • 标签
  • 归档

Kevin

你可以迷茫,但不可以虚度
首页
  • AI基础
  • RAG技术
  • 提示词工程
  • Wireshark抓包
  • 常见问题
  • 数据库
  • 代码技巧
  • 浏览器
  • 手册教程
  • 技术应用
  • 流程规范
  • github技巧
  • git笔记
  • vpn笔记
  • 知识概念
  • 学习笔记
  • 环境搭建
  • linux&运维
  • 微服务
  • 经验技巧
  • 实用手册
  • arthas常用
  • spring应用
  • javaAgent技术
  • 网站
友情链接
  • 分类
  • 标签
  • 归档
  • JVM性能调优

  • 并发编程

    • 📚多线程并发编程JUC知识点
    • JMM模型和volatile关键字
    • CPU缓存一致性协议MESI
    • JVM内置锁synchronized详解
    • 抽象队列同步器AQS应用Lock详解
    • 阻塞队列BlockingQueue详解
      • 一、概念
      • 二、队列类型
      • 三、队列数据结构
      • 四、常见4种阻塞队列
      • ArrayBlockingQueue
        • 队列的创建:
        • 应用场景:
        • 工作原理
      • LindedBlockingQueue
        • 队列的创建:
      • DelayQueue
        • 队列的创建
        • 要求:
        • 应用场景:
      • BlockingQueue API
        • 添加元素
        • 检索元素
      • 多线程生产者 - 消费者实例
    • Tools&CountDownLatch&Semaphore原理与应用
    • Atomic&Unsafe魔法类详解
    • HashMap详解
    • Executor线程池原理与源码解读
  • MySql

  • spring

  • redis

  • zookeeper

  • rabbitMQ

  • 架构

  • 锁

  • 分库分表

  • 学习笔记
  • 并发编程
kevin
2022-05-23
目录

阻塞队列BlockingQueue详解

# 一、概念

BlockingQueue,是java.util.concurrent包提供的用于解决并发生产者-消费者问题的最有用的类,它的特性是在任意时刻只有一个线程可以进行take或者put操作,并且BlockingQueue提供了超时return null的机制,在许多场景里都可以看到这个工具的身影。

# 二、队列类型

  1. 无限队列(unbounded queue):几乎可以无限增长
  2. 有限队列(bounded queue):定义了最大容量

# 三、队列数据结构

队列实质就是一种存储数据的结构

  • 通常用链表或者数组实现
  • 具备FIFO先进先出的特性,也有双端队列(Deque)优先级队列
  • 主要操作:入队(EnQueue)和出队(DeQueue)

image-20220621145051463

# 四、常见4种阻塞队列

  • ArrayBlockingQueue 由数组支持的有界队列
  • LinkedBlockingQueue 由链接节点支持的可选有界队列
  • PriorityBlockingQueue 由优先级堆支持的无界优先级队列
  • DelayQueue 由优先级堆支持的、基于时间调度队列

# ArrayBlockingQueue

队列基于数组实现,容量大小在创建ArrayBlockingQueue对象时已定义好,数据结构:

image-20220621151645920

# 队列的创建:

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();
1

# 应用场景:

在线程池中有比较多的应用,生产者消费者场景

# 工作原理

基于ReentrantLock保证线程安全,根据Coddition实现队列满时的阻塞

# LindedBlockingQueue

是一个基于链表的无界队列(理论上有界;Interger.MAX_VALUE)

# 队列的创建:

BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
1

向无限队列添加元素的所有操作都将永远不会阻塞(注意:这里不是说不会加锁保证线程安全),因此它可以增长到非常大的容量。

使用无限BlockingQueue设计生产者 - 消费者模型时最重要的是 消费者应该能够像生产者向队列添加消息一样快地消费消息。否则,内存可能会填满导致内存溢出:OutOfMemory异常

# DelayQueue

由优先级堆支持的、基于时间的调度队列,内部基于无界队列PriorityQueue实现,而无界队列基于数组扩容实现。

# 队列的创建

BlockingQueue<String> blockingQueue = new DelayQueue();
1

# 要求:

入队的对象必须要实现Delayed接口,而Delayed集成自Comparable接口

# 应用场景:

电影票:

​ 工作原理:队列内部会根据时间优先级进行排序。延迟类线程池周期执行。

# BlockingQueue API

BlockingQueue接口的所有方法可以分为两大类:负责向队列添加元素的方法和检索这些元素的方法。在队列满/空的情况下,来自这两个组的每个方法的行为都不同。

# 添加元素

方法 说明
add() 如果插入成功则返回 true,否则抛出 IllegalStateException 异常
put() 将指定的元素插入队列,如果队列满了,那么会阻塞直到有空间插入
offer() 如果插入成功则返回 true,否则返回 false
offer(E e, long timeout, TimeUnit unit) 尝试将元素插入队列,如果队列已满,那么会阻塞直到有空间插入

# 检索元素

方法 说明
take() 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用
poll(long timeout, TimeUnit unit) 检索并删除队列的头部,如有必要,等待指定的等待时间以使元素可用,如果超时,则返回 null

在构建生产者 - 消费者程序时,这些方法是BlockingQueue 接口中最重要的构建块。

# 多线程生产者 - 消费者实例

创建一个由两部分组成的程序 - 生产者(Producer)和消费者(Consumer)。

生产者将生成一个0到100的随机数(十全大补丸的编号),并将该数字放在BlockingQueue中。我们将创建16个线程(潘金莲)用于生成随机数并使用put()方法阻塞,直到队列中有可用空间。

注意:我们需要阻止消费者线程无限期的等待元素出现在队列中。

从生产者(潘金莲)向消费者(武大郎)发出信号的方法是,不需要处理消息,而是发送消息(毒丸poison pill),需要发送尽可能多的消息(毒丸poison pill),因为有消费者(武大郎)。然后当消费者从队列中获取消息(毒丸poison pill)时,它将优雅地完成执行。

生产者代码

@Slf4j
public class NumbersProducer implements Runnable {
    private BlockingQueue<Integer> numbersQueue;
    private final int poisonPill;
    private final int poisonPillPerProducer;

    public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void generateNumbers() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
            log.info("潘金莲-{}号,给武大郎的泡药!",Thread.currentThread().getId());
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
            log.info("潘金莲-{}号,往武大郎的药里放入第{}颗毒丸!",Thread.currentThread().getId(),j+1);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

生成器构造函数将BlockingQueue作为参数,用于协调生产者和使用者之间的处理。我们看到方法generateNumbers()将100个元素放入队列中。

每个消费者将使用take()方法从BlockingQueue中获取一个元素,因此它将阻塞,直到队列中有一个元素。

@Slf4j
public class NumbersConsumer implements Runnable {
    private BlockingQueue<Integer> queue;
    private final int poisonPill;

    public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }

    public void run() {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                log.info("武大郎-{}号,喝药-编号:{}",Thread.currentThread().getId(),number);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

需要注意的重要事项是队列的使用。与生成器构造函数中的相同,队列作为参数传递。我们可以这样做,是因为 BlockingQueue 可以在线程之间共享而无需任何显式同步。

既然我们有生产者和消费者,我们就可以开始我们的计划。我们需要定义队列的容量,并将其设置为 10个元素。

我们创建4 个生产者线程,并且创建等于可用处理器数量的消费者线程:

public class Main {

    public static void main(String[] args) {
        int BOUND = 10;
        int N_PRODUCERS = 16;
        int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
        int poisonPill = Integer.MAX_VALUE;
        int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;
        int mod = N_CONSUMERS % N_PRODUCERS;

        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);
        //潘金莲给武大郎熬药
        for (int i = 1; i < N_PRODUCERS; i++) {
            new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
        }
        //武大郎开始喝药
        for (int j = 0; j < N_CONSUMERS; j++) {
            new Thread(new NumbersConsumer(queue, poisonPill)).start();
        }
        //潘金莲开始投毒,武大郎喝完毒药GG
        new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer + mod)).start();
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

BlockingQueue 是使用具有容量的构造创建的。我们正在创造 4 个生产者和 N 个消费者(武大郎)。我们将我们的毒 ( poison ) 丸 ( pill )消息指定为 Integer.MAX_VALUE,因为我们的生产者在正常工作条件下永远不会发送这样的值。这里要注意的最重要的事情是 BlockingQueue 用于协调它们之间的工作。

上次更新: 2022/06/22, 18:19:16
抽象队列同步器AQS应用Lock详解
Tools&CountDownLatch&Semaphore原理与应用

← 抽象队列同步器AQS应用Lock详解 Tools&CountDownLatch&Semaphore原理与应用→

最近更新
01
AI是如何学习的
06-06
02
提示词工程实践指南
06-06
03
chatGpt提示原则
06-06
更多文章>
| Copyright © 2022-2025 Kevin | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式