Kevin's blog Kevin's blog
首页
  • AI基础
  • RAG技术
  • 提示词工程
  • Wireshark抓包
  • 常见问题
  • 数据库
  • 代码技巧
  • 浏览器
  • 手册教程
  • 技术应用
  • 流程规范
  • github技巧
  • git笔记
  • vpn笔记
  • 知识概念
  • 学习笔记
  • 环境搭建
  • linux&运维
  • 微服务
  • 经验技巧
  • 实用手册
  • arthas常用
  • spring应用
  • javaAgent技术
  • 网站
友情链接
  • 分类
  • 标签
  • 归档

Kevin

你可以迷茫,但不可以虚度
首页
  • AI基础
  • RAG技术
  • 提示词工程
  • Wireshark抓包
  • 常见问题
  • 数据库
  • 代码技巧
  • 浏览器
  • 手册教程
  • 技术应用
  • 流程规范
  • github技巧
  • git笔记
  • vpn笔记
  • 知识概念
  • 学习笔记
  • 环境搭建
  • linux&运维
  • 微服务
  • 经验技巧
  • 实用手册
  • arthas常用
  • spring应用
  • javaAgent技术
  • 网站
友情链接
  • 分类
  • 标签
  • 归档
  • JVM性能调优

  • 并发编程

    • 📚多线程并发编程JUC知识点
      • 并发集合
        • ConcurrentHashMap
        • ConcurrentLinkedQueue
        • java中的阻塞队列
        • CopyOnWrite
      • 线程池
        • Execuror
        • ThreadPoolExecutor
        • Callable和Future
        • ScheduledExecutorService
        • 合理配置线程池
        • 线程池监控
      • 原子操作
        • 基本类型
        • 数组
        • 引用类型
        • 原子更新字段类
        • java如何实现原子操作
      • 内存模型
        • 重排序
        • 顺序一致性
        • happens-beofre
        • as-if-serial
        • JMM的内存可见性保证
      • synchronized
        • synchronized原理
      • 线程通信
        • 共享内存
        • 消息传递
      • 其他
        • ThreadLocal
        • Fork / Join
        • disruptor并发框架
      • 锁
        • ReentranLock
        • ReentrantReadWriterlock
        • Volatile
        • 锁的内存语义
        • concurrent包的实现
        • Lock接口
        • Condition
        • LockSupport
      • 并发工具类
        • CyclicBarrier
        • CountDownLatch
        • CylicBarrier和CountDownLatch的区别
        • Semaphore
      • 并发基础
        • AQS
        • CAS
        • 线程间通信
        • ThreadLocal
    • JMM模型和volatile关键字
    • CPU缓存一致性协议MESI
    • JVM内置锁synchronized详解
    • 抽象队列同步器AQS应用Lock详解
    • 阻塞队列BlockingQueue详解
    • Tools&CountDownLatch&Semaphore原理与应用
    • Atomic&Unsafe魔法类详解
    • HashMap详解
    • Executor线程池原理与源码解读
  • MySql

  • spring

  • redis

  • zookeeper

  • rabbitMQ

  • 架构

  • 锁

  • 分库分表

  • 学习笔记
  • 并发编程
kevin
2022-05-10
目录

📚多线程并发编程JUC知识点

# 多线程并发编程JUC知识点

# 并发集合

# ConcurrentHashMap

ConcurrentHashMap (opens new window)是线程安全且高效的hahsMap.

  • 什么是HashMap (opens new window)

  • 高并发下的HashMap (opens new window)

    因为在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap.

  • 什么是ConcurrentHashMap (opens new window)

  • ConcurrentHashMap的结构

    ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

  • ConcurrentHashMap初始化

    ConcurrentHashMap初始化方法是通过initialCapacity、loadFactor和concurrencyLevel等几个参数来初始化segment数组、段偏移量segmentShift、段掩码segmentMas和每个segment里的HashEntry数组来实现的。

  • ConcurrentHashMap允许多个读并发进行?

    ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。如果使用传统的技术,如HashMap中的实现,如果允许可以在hash链的中间添加或删除元素,读操作不加锁将得到不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry代表每个hash链中的一个节点,其结构如下所示:

    static final class HashEntry<K,V> {  
         final K key;  
         final int hash;  
         volatile V value;  
         final HashEntry<K,V> next;  
     } 
    
    1
    2
    3
    4
    5
    6

    可以看到除了value不是final的,其它值都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,因为这需要修改next 引用值,所有的节点的修改只能从头部开始。对于put操作,可以一律添加到Hash链的头部。但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。这在讲解删除操作时还会详述。为了确保读操作能够看到最新的值,将value设置成volatile,这避免了加锁。

  • ConcurrentHashMap的size操作

    如果我们要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count是一个volatile变量,那么在多线程场景下,我们是不是直接把所有Segment的count相加就可以得到整个ConcurrentHashMap大小了呢?不是的,虽然相加时可以获取每个Segment的count的最新值,但是拿到之后可能累加前使用的count发生了变化,那么统计结果就不准了。所以最安全的做法,是在统计size的时候把所有Segment的put,remove和clean方法全部锁住,但是这种做法显然非常低效。 因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。 那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put , remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。

  • ConcurrentHashMap的get操作

    Segment的get操作实现非常简单和高效。先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素,代码如下。

    public V get(Object key) {
       int hash =hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
    }
    
    1
    2
    3
    4

    get操作的高效之处在于整个get过程不需要加锁,除非读到的值是空才会加锁重读。 那么ConcurrentHashMap的get操作是如何做到不加锁的呢? 原因是它的get方法里将要使用的共享变量都定义成volatile类型,如用于统计当前Segement大小的count字段和用于存储值的HashEntry的value。定义成volatile的变量,能够在线 程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写 (有一种情况可以被多线程写,就是写入的值不依赖于原值),在get操作里只需要读不需要写共享变量count和value,所以可以不用加锁。之所以不会读到过期的值,是因为根据Java内存模型的happen before原则,对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的值,这是用volatile`替换锁的经典应用场景。 transient volatile int count; volatile V value;

  • ConcurrentHashMap的put操作

    由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。put方法首先定位到Segment然后在Segment里进行插入操作。插入操作需要经历两个 步骤: 第一步判断是否需要对Segment里的HashEntry数组进行扩容, 第二步定位添加元素的位置,然后将其放在HashEntry数组里

# ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元 素时,它会返回队列头部的元素。它采用了“wait-free”算法(即CAS算法)来实现,该算法在 Michael&Scott算法上进行了一些修改

  • 如何实现一个线程安全的队列?

    • 1、使用阻塞算法

      • 用一个锁(入队和出队用同一把锁)或两把锁(入队和出队用不同的锁)等范式来 实现
    • 2、使用非阻塞算法

      • 非阻塞的实现方式则 可以使用循环CAS的方式来实现
  • ConcurrentLinkedQueue

    ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和 指向下一个节点(next)的引用组成,节点与节点之间就是通过这next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。

    private transient volatile Node<E> tail = head;
    
    1
    • 入队列

      • 入队列就是将入队节点添加到队列的尾部
    • 出队列

# java中的阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除操作。 支持阻塞的插入方法:意思是当队列满时,队列 会阻塞插入 元素的线程,直到队列不满。 支持阻塞 的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

抛出异常: 当队列满时 ,如果再往队列 里插入元素,会抛出IllegalStateException("Queue full")。当队列空时,从队列里获取元素会抛出throw new NoSuchElementException(); 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。

  • ArrayBlockingQueue

    是一个用数组实现的有界阻塞 队列,按照先进先出(FIFO)的原则对元素进行排序。 ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true); 访问者的公平性是使用可重入锁实现的,代码如下。

    public ArrayBlockingQueue(int capacity, boolean fair) {
    	if (capacity <= 0)
      throw new IllegalArgumentException();
      this.items = new Object[capacity];
      lock = new ReentrantLock(fair);
      notEmpty = lock.newCondition();
      notFull = lock.newCondition();
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
  • LinkedBlockingQueue

    用链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE.此队列按照先进先出的原则对元素进行排序。

  • PriorityBlockingQueue

    PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证 同优先级元素的顺序。

  • DelayQueue

    DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素

    • 使用场景

      • 缓存系统的设计

        可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素,表示缓存有效期到了。

      • 定时任务调度

        使用DelayQueue保存当前将会执行的任务和执行时间,一旦从DelayQueue中取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。

  • SynchronousQueue

    SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。 SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。

  • LinkedTransferQueue

    LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻 塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

  • LinkedBlockingDeque

    LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队 时,也就减少了一半的竞争

# CopyOnWrite

  • CopyOnWriteArrayList

  • CopyOnWriteArraySet

  • CopyOnWrite容器介绍

    CopyOnWrite 容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy`,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我

  • 使用场景

    • 黑白名单
    • 读多写少
  • 优缺点

    • 占内存(写时复制new两个对象)、不能保证数据实时一致性

# 线程池

# Execuror

# ThreadPoolExecutor

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);

  • 构造参数

    • RejectedExecutionHandler

      RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolic

      • AbortPolicy

        • 直接抛出异常
      • CallerRunsPolicy

        • 只用调用者所在线程来运行任务
      • DiscardOldestPolicy

        • 丢弃队列里最近的一个任务,并执行当前任务。
      • DiscardPolicy

        • 不处理,丢弃掉
    • corePoolSize

      corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法, 线程池会提前创建并启动所有基本线程。

    • maximumPoolSize

      maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。

    • keepAliveTime

      • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
    • TimeUnit

      • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。
    • runnableTaskQueue

      • ArrayBlockingQueue

        • 是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序
      • LinkedBlockingQueue

        • 一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
      • SynchronousQueue

        • 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
      • PriorityBlockingQueue

        • 一个具有优先级的无限阻塞队列。
    • ThreadFactory

      ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字,代码如下。 new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();

  • 种类

    • FixedThreadPool

      • 创建使用固定线程数FixedThreadPool的API
    • SingleThreadExecutor

      • SingleThreadExecutor适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。
    • CachedThreadPool

      • CachedThreadPool是大小无界的线程池,适用于执行很多的短期异步任的小程序,或者是负载较轻的服务器

# Callable和Future

# ScheduledExecutorService

# 合理配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。 任务的性质:CPU密集型任务、IO密集型任务和混合型任务 任务的优先级:高中低 任务的执行时间:长中短 任务 的依赖性;是否依赖其他系统资源 CPU密集型任务应配置尽可能小的线程,如配置NCPU+1个线程的线程池, IO密集型任务线程并不是一直在执行任务 ,则应配置尽可能多的线程,如2*NCPU 。 可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数 优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理,它可以让优先级高的任务先执行。

# 线程池监控

在监控线程池的时候可以使用以下属性: taskCount: 线程池需要执行的任务数量 completedTaskCount: 线程池在 运行过程中已完成的任务数量,小于或等于taskCount。 getPoolSize;线程池的线程数量,如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减 getActiveCount; 获取活动的线程数

# 原子操作

# 基本类型

  • AtomicBoolean

  • AtomicInteger

    getAndIncrement是如何实现原子操作的呢?

    public final int getAndIncrement() {
      for (;;) {
        int current = get();
        int next = current + 1;
        if (compareAndSet(current, next))
        return current;
      	}
      }
      public final boolean compareAndSet(int expect, int update) {
      return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    源码中的for循环体的第一步先取得AtomicInteger里存储的数值 第二步对AtomicInteger的当前数值进行加1操作 第三步:调用compareAndSet方法来进行原子更新操作,该方法 先 检查当前数值是否等于current,等于意味着AtomicInteger的值没有被其他线程修改过,则将AtomicInteger的当前数值更新成next的值,如果不等compareAndSet方法会返回false,程序会进入for循环重新进行compareAndSet操作。

  • AtomicLong

# 数组

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

# 引用类型

原子更新基本类型的AtomicInteger,只能更新一个变量,如果要原子更新多个变量,就需要使用这个原子更新引用类型提供的类。

public class AtomicReferenceTest {
  public static AtomicReference<user> atomicUserRef = new
  AtomicReference<user>();
  public static void main(String[] args) {
    User user = new User("conan", 15);
    atomicUserRef.set(user);
    User updateUser = new User("Shinichi", 17);
    atomicUserRef.compareAndSet(user, updateUser);
    System.out.println(atomicUserRef.get().getName());
    System.out.println(atomicUserRef.get().getOld());
  }
  static class User {
    private String name;
    private int old;
    public User(String name, int old) {
    this.name = name;
    this.old = old;
  }
  public String getName() {
    return name;
  }
  public int getOld() {
    return old;
  }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  • AtomicReference
  • AtomicReferenceArrayFieldUpdater

# 原子更新字段类

  • AtomicIntegerFieldUpdater

    • 原子更新整型的字段的更新器
  • AtomicLongFieldUpdaater

    • 原子更新长整型字段的更新器
  • AtomicStampedReference

    原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于原子的更新数据和数据的版本号,可以解决使用CAS进行原子更新时可能出现的ABA问题。

    public class AtomicIntegerFieldUpdaterTest {
      // 创建原子更新器,并设置需要更新的对象类和对象的属性
      private static AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater.
      newUpdater(User.class, "old");
      public static void main(String[] args) {
        // 设置柯南的年龄是10岁
        User conan = new User("conan", 10);
        // 柯南长了一岁,但是仍然会输出旧的年龄
        System.out.println(a.getAndIncrement(conan));
        // 输出柯南现在的年龄
        System.out.println(a.get(conan));
      }
      public static class User {
        private String name;
        public volatile int old;
        public User(String name, int old) {
          this.name = name;
          this.old = old;
        }
        public String getName() {
          return name;
        }
        public int getOld() {
        	return old;
      }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26

# java如何实现原子操作

  • 使用循环CAS实现原子操作

  • 锁

    锁机制保证了只有获得锁的线程才能够操作锁定的内存区域。JVM内部实现了很多种锁机制,有偏向锁、轻量级锁和互斥锁。有意思的是除了偏向锁,JVM实现锁的方式都用了循环 CAS,即当一个线程想进入同步块的时候使用循环CAS的方式来获取锁,当它退出同步块的时候使用循环CAS释放锁

# 内存模型

# 重排序

重排序是指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段。

  • 数据依赖性

    如果两个操作访问统一变量,且这两个操作有一个是写操作,此时这两个操作之间就存在数据依赖性,

    • 写一个变量之后,再读这个变量
    • 写一个变量之后,再写这个变量
    • 读一个变量之后,在写这个变量
  • 重排序对多线程的影响

# 顺序一致性

  • 数据竞争与顺序一致性

  • 顺序一致性内存模型

    顺序一致性内存模型有两大特性: 一个线程中的所有操作必须按照程序的顺序来执行 (不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致性内存模型中,每隔操作必须原子执行且立刻对都有的线程可见。

  • 同步程序的顺序一致性效果

  • 未同步程序的执行特性

    未同步程序 在JMM模型和顺序一致性模型有一下几个差异: 顺序一致性模型保证 单线程内的操作会按程序的顺序执行,而JMM不保证单线程内的操作会按程序的顺序执行。 顺序一致性模型保证所有线程只能看到一致的操作执行孙旭,而JMM不保证 所有线程看到一致的操作执行顺序。 JMM不保证对64位的long型和double型的写操作具有原子性,而顺序一致性模型保证对所有的内存读/写操作具有与原子性

# happens-beofre

  • 定义

    如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行熟悉怒排在第二个操作之前。 两个操作之间存在happens-before关系,并不意味java平台的具体实现必须按照happens-before关系指定的顺序来执行。如果重排序后的执行结构,与按happens-before关系来执行的结果一致,那么这种重排序并不非法(JMM允许这种重排序) as-if-serial语义保证单线程内程序的执行结果不被改变,happens-before关系保证正确同步的多线程程序的执行结构不被改变。

  • 规则

    • 1、程序次序规则:在一个单独的线程中,按照程序代码的执行流顺序,(时间上)先执行的操作happen-before(时间上)后执行的操作
    • 2、管理锁定规则:一个unlock操作happen-before后面(时间上的先后顺序,下同)对同一个锁的lock操作
    • 3、volatile变量规则:对一个volatile变量的写操作happen-before后面对该变量的读操作
    • 4、线程启动规则:Thread对选哪个的start()方法happen-before此线程的每一个动作
    • 5、线程终止规则:线程所有操作都happen-before对此线程的终止检测,可以通过Thread.join()方法结束、Thread.isAlive()的返回值等手段检测到线程已经终止执行
    • 6、线程中断规则:对线程interrupt()方法的调用happen-before发生于被中断线程的代码检测到中断时时间的发生
    • 7、对象终结规则:一个对象的初始化完成(构造函数执行结束)happen-before他的finalize()方法的开始
    • 8、传递性:如果操作A happen-before操作B,操作B happen-before操作C,那么可以得出A happen-before操作C

# as-if-serial

不管怎么重排序(编译器和处理器为了提高并行度),(单线程)程序的执行结构不能被改变。 为了遵守as-if-serial语义,编译器和处理器不会存在数据依赖关系的操作 做重排序,但是如果操作之前不存在数据依赖关系,这些操作就可能被编译器和处理器重排序。 以下面例子为例:

# JMM的内存可见性保证

# synchronized

# synchronized原理

对于普通同步方法,锁是当前实例对象  对于静态同步方法,锁是当前类Class对象 对于同步方法块,锁是Synchronized括号里配置的对象 JVM基于进入和退出Monitor对象来显示方法同步和代码块同步,但两者的实现细节不一样。代码块同步是使用monitorenter和monitorexit指令实现的,而方法同步是使用另外一种方式实现的。 monitorenter指令是在编译后插入到同步代码块的开始位置,而monitorexit是插入到方法的结束处和异常处,JVM保证每个monitorenter必须有对应的monitorexit与之配对。

# 线程通信

# 共享内存

  • 线程之间共享程序的公共状态,通过写-读内存中的公共状态进行隐式通信

# 消息传递

  • 线程之间必须通过发送消息来显示进行通信

# 其他

# ThreadLocal

  • set(T) : s设置一个值

# Fork / Join

Fork/Join框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 Fork:就是把一个大任务切分成若干个子任务并行的执行 Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。

  • Fork / Join设计

    • 分割任务

      • 首先我们需要一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。
    • 执行任务合并结果

      • 分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

# disruptor并发框架

# 锁

# ReentranLock

  • 实现重进入 (opens new window)

    重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决一下两个问题: 1、线程再次进入:锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。 2、锁的 最终释放:线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行技术自增,技术表示当前锁被重复获取的次数,而锁被释放时,计数自减去,当技术为0时表示锁已经成功释放。 成功获取锁的线程再次获取锁,只是增加了同步状态值。 如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条 件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。

  • 公平锁

    • 锁的获取顺序应该符合请求的绝对时间顺序,也就是FIFO
  • 非公平锁

    • 只要CAS设置同步状态成功,则表示当前线程获取了锁

# ReentrantReadWriterlock

# Volatile

  • volatile实现机制

    volatile是轻量级的synchronized,它在多处理器开发中保证了共享变量的“可见性”。可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。volatile不会引起上下文的切换 和 调度。

  • 内存语义

    • volatile特性

      volatile变量自身具有下列特性: 可见性 对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。 原子性 对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种符合操作不具有原子性

    • volatile写-读的内存语义

      当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主内存。 当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量。

    • volatile内存语义的实现

      当第二个操作时volatile写时,不管第一个操作时什么,都不能重排序 当第一个操作时volatile读时,不管第二个操作是什么,都不能排序 当第一个操作时volatile写,第二个操作时volatile读时,不能重排序

      为了实现volatile的内存语义,编译器在生成字节码时,会在指令列中插入内存屏障来禁止特定类型的处理器重排序。下面是基于保守策略的JMM内存屏障插入策略。

      在每隔volatile写操作 的前后插入一个StoreStore屏障。 在每个volatile写操作的后面插入一个StoreLoad屏障。 在每个volatile读操作的后面插入一个LoadLoad屏障 在每个volatile读操作的后面插入一个LoadStore屏障

# 锁的内存语义

锁释放和锁获取语义 线程A释放一个锁,实质上线程A向接下来将要获取这个锁的某个线程发出了(线程A对共享变量所做的修改的消息。 线程B获取一个锁,实质上是线程B接收了之前某个线程发出的(在这个锁之前对共享变量所做的修改的)消息 线程A释放锁,随后线程B获取这个锁,这个过程实质上线程A通过主内存向线程B发送消息

  • NonfairSync

    锁释放和锁获取语义 线程A释放一个锁,实质上线程A向接下来将要获取这个锁的某个线程发出了(线程A对共享变量所做的修改的消息。 线程B获取一个锁,实质上是线程B接收了之前某个线程发出的(在这个锁之前对共享变量所做的修改的)消息 线程A释放锁,随后线程B获取这个锁,这个过程实质上线程A通过主内存向线程B发送消息

ReentrantLock 的实现依赖于Java同步器AbstractQueuedSynChronizer(AQS)。AQS使用一个 整型的volatile变量(命令为 state)来维护 同步状态,

ReentrantLock分为公平锁和非公平锁,首先我们分析公平锁:

使用公平锁时,加锁防范lock()调用归集为如下:

  1. ReentrantLock:lock();
  2. FairSync: lock()
  3. AbstractQueuedSynchronized:acquired(int arg)
  4. ReentrantLock: tryAcquire(int acquires)

从上面源代码中我们可以看出,加锁方法首先读volatile变量state。

在使用公平锁时,解锁方法unlock()调用轨迹如下。

  1. ReentrantLock:unlock()。
  2. AbstractQueuedSynchronizer:release(int arg)。
  3. Sync:tryRelease(int releases)。

在第3步真正开始释放锁,下面是该方法的源代码。

**公平锁在释放锁的最后写volatile变量state,在获取锁时首先读这个volatile变量。根据

volatile的happens-before规则,释放锁的线程在写volatile变量之前可见的共享变量,在获取锁

的线程读取同一个volatile变量后将立即变得对获取锁的线程可见**

# concurrent包的实现

仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式: 声明共享变量的volatile 使用CAS的原子条件更新来实现线程之间的同步 配合以volatile的读/写和CAS所具有volatile读和写的内存语义来实现线程之前的通信

# Lock接口

Lock lock = new ReentrantLock();
lock.lock();
try{

}finally{
  lock.unlock();
}
1
2
3
4
5
6
7

在finally块中释放锁,目的是保证在获取到锁之后,最终能释放锁。

# Condition

Condition接口提供了类似Object的 监视器方法,与Lock配合可以实现等待/通知模式

public class BoundedQueue<T> {
  private Object[] items;
  // 添加的下标,删除的下标和数组当前数量
  private int addIndex, removeIndex, count;
  private Lock lock = new ReentrantLock();
  private Condition notEmpty = lock.newCondition();
  private Condition notFull = lock.newCondition();
  public BoundedQueue(int size) {
  	items = new Object[size];
  }
  // 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
  public void add(T t) throws InterruptedException {
  	lock.lock();
    try {
      while (count == items.length)
      notFull.await();
      items[addIndex] = t;
      if (++addIndex == items.length)
      addIndex = 0;
      ++count;
      notEmpty.signal();
    } finally {
      lock.unlock();
    }
  }
  // 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
  @SuppressWarnings("unchecked")
  public T remove() throws InterruptedException {
    lock.lock();
    try {
      while (count == 0)
      notEmpty.await();
      Object x = items[removeIndex];
      if (++removeIndex == items.length)
        removeIndex = 0;
        --count;
      notFull.signal();
        return (T) x;
    } finally {
    	lock.unlock();
  	}
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  • Condition实现

    CondiitonObject是同同步器AbstractQueuedSynchronizer的内部类 ,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也会是比较合理的,每个Condition对象都包含着一个队列(等待队列),该队列时Condition对象实现等待/通知功能的 关键。

    • 等待队列

      等待队列时一个FIFO的队列,在队列 的每个节点都包含一个线程引用,该 线程就是在Condition对象等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会 释放锁、构造节点加入等待队列进入 等待 状态。事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的经静态内部类AbstractQueuedSynchronizer.Node 一个Condition包含一个等待 队列Condition拥有首节点(fristWaiter)和尾节点(lastWriter)。当前线程调用Condition.await()方法,将会以 当前线程构造节点,并将节点从尾部加入等待队列。

    • 等待

      调用Condition 的await()方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态,当 从await()方法返回时,当前线程一定获取了Condition 相关联的锁, 如果从 队列 (同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。

    • 通知

      调用Condition的signal()方法,将唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移动到同步队列中。

# LockSupport

当需要阻塞或唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作,LockSupport为构建同步组件的基础工具。 LockSupport定义了一组 以park开头的方法用来阻塞当前线程,以及unpark(Thread thread)方法来唤醒一个被阻塞的线程。 Park有停车的意思,假设线程为车辆,那么park方法代表着停车,而unpark方法则是指车辆启动离开

# 并发工具类

# CyclicBarrier

让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。 CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。示例代码如下所示:

public class CyclicBarrierTest {
  static CyclicBarrier c = new CyclicBarrier(2);
  public static void main(String[] args) {
      new Thread(new Runnable() {
          @Override
          public void run() {
              try {
                  c.await();
              } catch (Exception e) {
              }
              System.out.println(1);
          }
      }).start();
      try {
          c.await();
      } catch (Exception e) {
      }
      System.out.println(2);
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

} CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrier- Action),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景

  • 应用场景

    CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。例如,用一个Excel保存了用户所有银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

    public class BankWaterService implements Runnable {
        /**
         * 创建4个屏障,处理完之后执行当前类的run方法
         */
        private CyclicBarrier c = new CyclicBarrier(4, this);
        /**
         * 假设只有4个sheet,所以只启动4个线程
         */
        private Executor executor = Executors.newFixedThreadPool(4);
        /**
         * 保存每个sheet计算出的银流结果
         */
        private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();
      
    private void count() {
        for (int i = 0; i < 4; i++) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    // 计算当前sheet的银流数据,计算代码省略
                    sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
                    // 银流计算完成,插入一个屏障
                    try {
                        c.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    
    @Override
    public void run() {
        int result = 0;
        // 汇总每个sheet计算出的结果
        for (Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
            result += sheet.getValue();
        }
        // 将结果输出
        sheetBankWaterCount.put("result", result);
        System.out.println(result);
    }
    
    public static void main(String[] args) {
        BankWaterService bankWaterCount = new BankWaterService();
        bankWaterCount.count();
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48

    }

# CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。 假如有这样一个需求:我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法是使用 join()方法

public class JoinCountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
  Thread parser1 = new Thread(new Runnable() {
    @Override
    public void run() {
    }
  });
  Thread parser2 = new Thread(new Runnable() {
    @Override
    public void run() {
      System.out.println("parser2 finish");
    }
    });
      parser1.start();
      parser2.start();
      parser1.join();
      parser2.join();
    System.out.println("all parser finish");
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

join用于让当前执行线程等待join线程执行结束。其实现原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。其中,wait(0)表示永远等待下去 ,代码如下:

while (isAlive()) {
	wait(0);
}
1
2
3

直到join线程中止后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是在JVM里 实现的,所以在JDK里看不到,大家可以查看JVM源码。 CountDownLatch实现:

public class CountDownLatchTest {
	staticCountDownLatch c = new CountDownLatch(2);
  public static void main(String[] args) throws InterruptedException {
    new Thread(new Runnable() {
    @Override
    public void run() {
    System.out.println(1);
    c.countDown();
    System.out.println(2);
    c.countDown();
  }
  }).start();
     c.await();
  System.out.println("3");
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个CountDownLatch的引用传递到线程里即可。 如果有某个解析sheet的线程处理得比较慢,我们不可能让主线程一直等待,所以可以使用另外一个带指定时间的await方法——await(long time,TimeUnit unit),这个方法等待特定时 间后,就会不再阻塞当前线程。join也有类似的方法。

  • 一个线程调用countDown方法happen-before,另外一个线程调用await方法

# CylicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。

    CountDownLatch CyclicBarrier
    减计数方式 加计数方式
    计算为0时释放所有等待的线程 计数达到指定值时释放所有等待线程
    计数为0时,无法重置 计数达到指定值时,计数置为0重新开始
    调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响 调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞
    不可重复利用 可重复利用

# Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

  • 使用场景

    Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程 并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制。

    public class SemaphoreTest {
      private static final int THREAD_COUNT = 30;
      private static ExecutorServicethreadPool = Executors.newFixedThreadPool(THREAD_COUNT);
      private static Semaphore s = new Semaphore(10);
      public static void main(String[] args) {
        for (inti = 0; i< THREAD_COUNT; i++) {
          threadPool.execute(new Runnable() {
          @Override
          public void run() {
            try {
              s.acquire();
              System.out.println("save data");
              s.release();
            } catch (InterruptedException e) {
            }
            }
          });
        }
        threadPool.shutdown();
      }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21

# 并发基础

# AQS

  • abstractqueuedSynchronized同步器

    用来构建锁或其他同步组件的基础框架 内部通过一个int 类型的成员变量state来控制同步状态: state=0: 说明没有任何线程占有共享资源的锁。 state = 1:说明有线程正在使用共享变量,其他线程必须加入同步队列进行等待。 AQS内部通过内部类Node构成FIFO的同步队列来 完成线程获取锁的排队工作,同时利用内部类ConditionObject构建等待队列。 Condition调用wait()方法后,线程将会加入等待队列中。 Condition调用signal()方法后,线程 将从等待队列移动同步队列中进行锁竞争。 上面设计到两个队列 同步队列:当线程请求锁而等待的请求将加入同步队列等待 。 等待队列(可能多个): 通过Condition调用await()方法释放锁后,将加入等待队列。

  • 队列同步器

    同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的 方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些 模板方法将会调用使用者重写的方法

    • 以下三个方法来修改同步状态

      • getState() :获取当前同步状态
      • setState(int newState):设置当前同步状态
      • compareAndState(int expext,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性
    • 同步器可重写的方法基本为3类

      • 独占式 获取与释放 同步状态
      • 共享式获取与释放不同状态
      • 查询同步队列中的等待线程情况
  • 独占锁

    • 同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能获取锁
  • 实现方式

    • 同步队列

      同步器依赖内部的同步队列(FIFO双向队列)来完成同步状态的管理,当前线程获取状态失败时,同步器会将当前线程以及等待状态等信息构造成称为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

    • 独占式同步状态获取与释放

      独占锁概念:同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能获取锁。 通过调用同步的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步 状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除,该代码如下:

      public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
          selfInterrupt();
      }
      
      1
      2
      3
      4
      5

      上述代码主要完成同步状态获、节点构造、加入同步队列以及在同步对队列中自旋等待的相关工作。 主要逻辑如下: 首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式 Node.EXCLUSIVE,统一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以"以死循环"的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点 的出队或阻塞线程被中断来实现。

      上述代码通过使用compareAndSetTail(Node expect,Node update)方法来确保节点能够被线程安全添加。试想一下:如果使用一个普通的LinkedList来维护节点之间的关系,那么当一个线程获取了同步状态,而其他多个线程由于调tryAcquire(int arg)方法获取同步状态失败而并发地被添加到LinkedList时,LinkedList将难以保证Node的正确添加,最终的结果可能是节点的数量有偏差,而且顺序也是混乱的。

      在enq(final Node node)方法中,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。可以看出,enq(final Node node)方法将并发添加节点的请求通过CAS变得“串行化”了。

      在acquireQueued(final Node node,int arg)方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,这是为什么?原因有两个,如下。

      第一,头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。

      第二,维护同步队列的FIFO原则。该方法中,节点自旋获取同步状态的行为如图5-4所示。

      由于非首节点线程前驱节点出队或者被中断而从等待状态返回,随后检查自己的前驱是否是头节点,如果是则尝试获取同步状态。可以看到节点和节点之间在循环检查的过程中基本不相互通信,而是简单地判断自己的前驱是否为头节点,这样就使得节点的释放规则符合FIFO,并且也便于对过早通知的处理(过早通知是指前驱节点不是头节点的线程由于中断而被唤醒)。

      前驱节点为头节点且能够获取同步状态的判断条件和线程进入等待状态是获取同步状态的自旋过程。当同步状态获取成功之后,当前线程从acquire(int arg)方法返回,如果对于锁这种并发组件而言,代表着当前线程获取了锁。

      当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。该方法代码如代码如下所示。

      该方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor(Node node)方法使用 LockSupport(在后面的章节会专门介绍)来唤醒处于等待状态的线程

    • 共享式同步状态获取与释放

      共享锁(读锁):统一时刻可以有多个线程获取到同步状态

      在acquireShared(int arg)方法中,同步器调用tryAcquireShared(int arg)方法尝试获取同步状态,tryAcquireShared(int arg)方法返回值为int类型,当返回值大于等于0时,表示能够获取到同步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是

      tryAcquireShared(int arg)方法返回值大于等于0。可以看到,在doAcquireShared(int arg)方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出。与独占式一样,共享式获取也需要释放同步状态,通过调用releaseShared(int arg)方法可以释放同步状态,该方法代码如代码清单所示。

# CAS

  • Compare And Swap

    CAS 的全称是 Compare And Swap 即比较交换,其算法核心思想如下 函数:CAS(V,E,N) 参数:V 表示要更新的变量 E 预期值 N 新值 如果 V 值等于 E 值,则将 V 的值设为 N。若 V 值和 E 值不同,则说明已经有其他线程做了 更新,则当前线程什么都不做。通俗的理解就是 CAS 操作需要我们提供一个期望值,当期 望值与当前线程的变量值相同时,说明还没线程修改该值,当前线程可以进行修改,也就是 执行 CAS 操作,但如果期望值与当前线程不符,则说明该值已被其他线程修改,此时不执 行更新操作,但可以选择重新读取该变量再尝试再次修改该变量,也可以放弃操作

  • 缺陷

    • ABA

      因为CAS需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值 原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了,。

      • ABA解决方案

        解决思路就是使用版本号。在变量前面追加版本号,每次变量更新的时候把版本号+1,那么A->B->A 就会变成1A->2B->3A。从JDK1.5 开始,JDK的Atomic包里提供了一个类Atomic包里提供了一个AtomicStampedReference来解决ABA问题,

        public boolean compareAndSet(
        	V expectedReference, // 预期引用
        	V newReference, // 更新后的引用
        	int expectedStamp, // 预期标志
        	int newStamp // 更新后的标志
        )
        
        1
        2
        3
        4
        5
        6

        compareAndSet方法的作用首先检查当前引用是否等于预期 引用,并且检查当前标志是否 等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

    • 循环时间长开销大

      自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销

    • 只能保证一个共享变量的原子操作

      当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子 性,这个时候就可以用锁。还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。从Java 1.5开始,JDK提供了AtomicReference类来保证引用对之间的原子性,就可以把多个变量放在一个对象里来进行CAS操作。

# 线程间通信

  • Volatile和Synchronized

    volatile:用来修饰子字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而 对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性。 synchronized;可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。

    synchronized原理:

    上面的class信息中,对于同步块的实现使用了monitorenter 和 monitorexit指令,而同步方法则是依靠方法上使用ACC_SYNCHRONIZED来完成的。无论哪种方式,其本质是对一个对象的监视器(monitor)进行获取,而这个获取过程是排他的,也就是同一时刻只能有一个线程取到synchronized多保护 对象的监视器。

    任何一个对象都有自己的监视器,当这个对象由同步块后者这个对象的同步方法调用时,执行方法的线程必须取到该对象的监视器 才能进入同步块或者同步方法,而没有获取到的监视器的线程将会阻塞在同步块和同步方法 的入口处,进入BLOCKED状态。

  • 等待/通知

    **等待方遵循如下原则 **

    1. 获取对象的锁
    2. 如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条
    3. 条件满足 则执行对应的逻辑 对应的伪代码
    synchronized(对象) {
    while(条件不满足) {
    	对象.wait();
    }
    	对应的处理逻辑
    }
    
    1
    2
    3
    4
    5
    6

    通知方遵循如下原则

    1. 获得对象的锁。
    2. 改变条件。
    3. 通知所有等待在对象上的线程。 对应的伪代码如下。
    synchronized(对象) {
    	改变条件
    	对象.notifyAll();
    }
    
    1
    2
    3
    4
    • 使用wait()、notify()、和notiryAll()时需要先对调用对象加锁
    • 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列
    • notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifyAll的线程释放锁之后,等待的线程才有机会从wait()返回。
    • notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll()方法将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为BLOCKING
    • 从wait()方法返回的前提 是获得了调用对象的锁
  • Thread.join()

    如果一个线程A执行了thread.join()语句,其含义是当前线程 A等待thread线程终止之后 才从thread.join()返回,

    public class Join {
    	public static void main(String[] args) throws Exception {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; i++) {
          // 每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回
          Thread thread = new Thread(new Domino(previous), String.valueOf(i));
          thread.start();
          previous = thread;
      	}
        TimeUnit.SECONDS.sleep(5);
        System.out.println(Thread.currentThread().getName() + " terminate.");
      }
      static class Domino implements Runnable {
        private Thread thread;
        public Domino(Thread thread) {
        	this.thread = thread;
        }
        public void run() {
          try {
            thread.join();
          } catch (InterruptedException e) {
          }
          System.out.println(Thread.currentThread().getName() + " terminate.");
        }
      }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26

# ThreadLocal

可以通过set(T)方法来设置一个值,在当前线程下再通过get()方法获取原先设置的值;

public class Profiler {
  // 第一次get()方法调用时会进行初始化(如果set方法没有调用),每个线程会调用一次
  private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>() {
    protected Long initialValue() {
      return System.currentTimeMillis();
    }
  };
  public static final void begin() {
  	TIME_THREADLOCAL.set(System.currentTimeMillis());
  }
  public static final long end() {
  	return System.currentTimeMillis() - TIME_THREADLOCAL.get();
  }
  public static void main(String[] args) throws Exception {
    Profiler.begin();
    TimeUnit.SECONDS.sleep(1);
    System.out.println("Cost: " + Profiler.end() + " mills");
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
上次更新: 2022/08/11, 13:45:23
JVM 内存分析工具 MAT及实践
JMM模型和volatile关键字

← JVM 内存分析工具 MAT及实践 JMM模型和volatile关键字→

最近更新
01
AI是如何学习的
06-06
02
提示词工程实践指南
06-06
03
chatGpt提示原则
06-06
更多文章>
| Copyright © 2022-2025 Kevin | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式