Java 并发编程
进程与线程
进程与线程概述
进程
- 程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至 CPU,数据加载至内存。在 指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理 IO 的。
- 当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程。
- 进程就可以视为程序的一个实例。大部分程序可以同时运行多个实例进程(例如记事本、画图、浏览器等),也有的程序只能启动一个实例进程(例如网易云音乐、360 安全卫士等)。
线程
- 一个进程之内可以分为一到多个线程。
- 一个线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给 CPU 执行 Java 中,线程作为最小调度单位,进程作为资源分配的最小单位。
- 线程在 windows 中进程是不活动的,只是作为线程的容器。
两者对比
进程基本上相互独立的,而线程存在于进程内,是进程的一个子集。
进程拥有共享的资源,如内存空间等,供其内部的线程共享。
进程间通信较为复杂:
- 同一台计算机的进程通信称为 IPC(Inter-process communication)。
- 不同计算机之间的进程通信,需要通过网络,并遵守共同的协议,例如 HTTP。
线程通信相对简单,因为它们共享进程内的内存,一个例子是多个线程可以访问同一个共享变量。
线程更轻量,线程上下文切换成本一般上要比进程上下文切换低。
并发与并行
- 并发(concurrent)是同一时间应对(dealing with)多件事情的能力。
- 并行(parallel)是同一时间动手做(doing)多件事情的能力。
Java 线程
创建和运行线程
直接使用 Thread:
// 重写run方法,并指定线程名称 Thread t = new Thread() { @Override public void run() { log.debug("running"); } }; t.setName("t1"); // 启动线程 t.start();
使用 Runnable 表示可运行的任务,将任务和线程分开,让代码耦合度降低:
// 创建Runnable对象,表示一个可执行的任务 Runnable runnable = new Runnable() { @Override public void run() { log.debug("hello"); } }; // 创建线程,绑定对应方法后启动线程 Thread t = new Thread(runnable, "t2"); t.start();
用 Runnable 更容易与线程池等高级 API 配合,让任务类脱离了 Thread 继承体系,更灵活。
使用 FutureTask 接收 Callable 类型参数,适用于有返回结果的情况:
// 创建任务对象,返回结果 FutureTask<Integer> task = new FutureTask<>(new Callable<Integer>() { @Override public Integer call() throws Exception { log.debug("running..."); Thread.sleep(2000); return 21; } }); // 启动线程 Thread t = new Thread(task, "t3"); t.start(); // 获取结果,主线程在这里会同步等待 Integer result = task.get(); log.debug("result: {}", result);
查看进程和线程的方法
jps
命令查看所有 Java 进程。jstack <PID>
查看某个 Java 进程(PID)的所有线程状态。jconsole
来查看某个 Java 进程中线程的运行情况(图形界面)。
线程运行的原理
栈与栈帧
Java Virtual Machine Stacks (Java 虚拟机栈)
我们都知道 JVM 中由堆、栈、方法区所组成,其中栈内存是给谁用的呢?其实就是线程,每个线程启动后,虚拟机就会为其分配一块栈内存。
每个栈由多个栈帧(Frame)组成,对应着每次方法调用时所占用的内存。
每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法。
线程上下文切换
因为以下一些原因导致 cpu 不再执行当前的线程,转而执行另一个线程的代码:
- 线程的 CPU 时间片用完。
- 垃圾回收。
- 有更高优先级的线程需要运行。
- 线程自己调用了 sleep、yield、wait、join、park、synchronized、lock 等方法。
当 Context Switch 发生时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应的概念 就是程序计数器(Program Counter Register),它的作用是记住下一条 jvm 指令的执行地址,是线程私有的:
- 状态包括程序计数器、虚拟机栈中每个栈帧的信息,如局部变量、操作数栈、返回地址等。
- Context Switch 频繁发生会影响性能。
常见方法
start 与 run
start :启动一个新线程,在新的线程运行 run 方法中的代码。start 方法只是让线程进入就绪状态,里面的代码不一定立刻运行(CPU 的时间片还没分给它),每个线程对象的 start 方法只能调用一次,如果调用了多次会出现 IllegalThreadStateException。
run:新线程启动后会调用的方法。如果在构造 Thread 对象时传递了 Runnable 参数,则线程启动后会调用 Runnable 中的 run 方法,否则默认不执行任何操作。但可以创建 Thread 的子类对象,来覆盖默认行为。
注意:主线程中也可以直接调用 run 来执行对应的方法的,但是直接调用 run 是在主线程中执行了 run,没有启动新的线程。使用 start 是启动新的线程,通过新的线程间接执行 run 中的代码。
sleep 与 yield
- sleep:让当前执行的线程休眠 n 毫秒,休眠时让出 CPU 的时间片给其他线程。调用 sleep 会让当前线程从 Running 进入 Timed Waiting 状态(阻塞)。其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException(线程被唤醒后会走 catch 块里面的代码)。睡眠结束后的线程未必会立刻得到执行。建议使用 TimeUnit 的 sleep 代替 Thread 的 sleep 来获得更好的可读性。
- yield:提示线程调度器让出当前线程对 CPU 的使用。调用 yield 会让当前线程从 Running 进入 Runnable 就绪状态,然后调度执行其它线程。具体的实现依赖于操作系统的任务调度器。
注意:sleep 和 yield 虽然从表面上看都是让线程暂时先不要执行,但是从线程状态的改变上来看,sleep 是将线程转换成阻塞状态,这种状态下,任务调度器是不会去主动调度这个线程的。但是,yield 是让线程转换成就绪状态,虽然此时线程没有运行,但是任务调度器还是会有可能再一次让该线程继续执行的。
sleep 的一个小应用
在没有利用 cpu 来计算时,不要让 while(true)
空转浪费 cpu,这时可以使用 yield 或 sleep 来让出 cpu 的使用权给其他程序:
while (true) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
- 可以用 wait 或者条件变量来达到类似的效果。
- 不同的是,后两种需要加锁,并且需要相应的唤醒操作,一般适用于要进行同步的场景。
- sleep 适用于无需锁同步的场景。
join
- join:等待线程运行结束,哪一个线程调用的 join,就意味着等待哪一个线程调用结束。
观察如下代码:
Thread t = new Thread(() -> {
log.debug("start...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
r = 10;
}, "t1");
t.start();
log.debug("result: {}", r);
因为线程 t 睡眠 1s 的缘故,导致 r 最终打印并不是 10。假设我需要让主线程打印的 r 为 10,(换句话讲我要让主线程同步等待线程 t 的操作)可以利用 join 方法:
Thread t = new Thread(() -> {
log.debug("start...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
r = 10;
}, "t1");
t.start();
// 调用t线程的join方法,同步等待t线程的操作结束
t.join();
log.debug("result: {}", r);
此外,我们还可以在 join 方法中传入一个 long 值(join(long n)
),表示线程等待的最长时间。
interrupt
我们知道,sleep、wait、join 方法都会让线程进入阻塞状态,而 interrupt 则会清空这种状态,并且设置打断标记为 false:
Thread t = new Thread(() -> {
log.debug("sleep...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
// 输出false
log.debug("isInterrupted: {}", Thread.currentThread().isInterrupted());
throw new RuntimeException(e);
}
}, "t1");
t.start();
Thread.sleep(3000);
log.debug("interrupt");
t.interrupt();
打断正常运行的线程,只靠调用 interrupt 方法是没办法真正打断该线程的运行的,需要在线程内部自己写逻辑判断代码,判断打断状态来决定是否真正需要打断该线程的执行:
Thread t = new Thread(() -> {
while (true) {
// 判断打断标记是否为true,如果是,则退出循环,终止操作
if (Thread.currentThread().isInterrupted()) {
break;
}
}
}, "t1");
t.start();
log.debug("interrupt...");
t.interrupt();
打断 park 状态的线程,会让其退出 park 状态,但是打断标记还是 true:
Thread t = new Thread(() -> {
log.debug("park...");
LockSupport.park();
log.debug("unpark...");
// 下方代码输出为true
log.debug("isInterrupted: {}", Thread.currentThread().isInterrupted());
});
t.start();
Thread.sleep(1000);
t.interrupt();
值得注意的是,当线程打断标记为 true 时,再调用 park 方法,是没办法让线程进入 park 状态的,除非我们手动设置打断标记为 false。
终止模式——两阶段终止
两阶段终止(Two Phase Termination)模式,目的是解决在一个线程 T1 中如何 “优雅” 终止线程 T2 的问题。这里的 “优雅”,指的是如何给予 T2 执行善后处理操作的机会。
错误思路:
- 使用线程对象的 stop 方法停止线程:stop 方法会真正杀死线程,如果这时线程锁住了共享资源,那么当它被杀死后就再也没有机会释放锁, 其它线程将永远无法获取锁。(该方法目前已经废弃,不推荐使用)
- 使用
System.exit(int)
方法停止线程:目的仅是停止一个线程,但这种做法会让整个程序都停止。
假设这样一个场景:我们需要设置一个监控线程,每隔 1s 去监控一些事件,当发生异常的时候,需要打断该线程的执行,并执行一些善后操作,我们可以绘制出如下流程图:
graph TD w("while(true)") --> a a("有没有打断?") -- 是 --> b(处理善后操作) b --> c(结束循环) a -- 否 --> d(睡眠1s) d -- 无异常 --> e(执行监控记录) d -- 有异常 --> i(设置打断标记) i --> w e --> w
使用 isInterrupt 实现
两阶段终止如下:
@Slf4j
public class JUCTest {
public static void main(String[] args) throws Exception {
TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination();
// 启动监控线程
twoPhaseTermination.start();
// 主线程等待3s后关闭监控线程
Thread.sleep(3500);
twoPhaseTermination.stop();
}
}
@Slf4j
class TwoPhaseTermination {
private Thread monitor;
// 启动监控线程
public void start() {
monitor = new Thread(() -> {
while (true) {
Thread currentThread = Thread.currentThread();
if (currentThread.isInterrupted()) {
log.debug("善后操作...");
break;
}
try {
Thread.sleep(1000);
log.debug("执行监控记录");
} catch (InterruptedException e) {
// 如果睡眠时被打断,则会导致打断标记设为false,退出不了循环,需要我们手动置1,这里不能抛异常,否则善后操作的代码无法执行。(print异常倒是可以)
currentThread.interrupt();
}
}
});
monitor.start();
}
// 停止监控线程
public void stop() {
monitor.interrupt();
}
}
利用停止标记实现
待续。。。
主线程与守护线程
默认情况下,Java 进程需要等待所有线程都运行结束,才会结束。有一种特殊的线程叫做守护线程,只要其它非守 护线程运行结束了,即使守护线程的代码没有执行完,也会强制结束。
Thread t = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) {
break;
}
}
log.debug("结束1");
}, "t1");
// 在启动之前把t1设置为守护线程
t.setDaemon(true);
t.start();
Thread.sleep(1000);
log.debug("结束2"); // 主线程(非守护线程)结束了,t1自然也会结束(本来应该一直运行的)
注意:
- 垃圾回收器线程就是一种守护线程。
- Tomcat 中的 Acceptor 和 Poller 线程都是守护线程,所以 Tomcat 接收到 shutdown 命令后,不会等待它们处理完当前请求。
线程的五种状态
线程的五种状态是从操作系统层面上讲的:
flowchart LR CPU --> 运行状态 初始状态 --> 可运行状态 --> 运行状态 --> 终止状态 运行状态 --> 可运行状态 运行状态 --> 阻塞状态 阻塞状态 --> 可运行状态
- 初始状态:仅是在语言层面创建了线程对象,还未与操作系统线程关联。
- 可运行状态(就绪状态):指该线程已经被创建(与操作系统线程关联),可以由 CPU 调度执行。
- 运行状态:指获取了 CPU 时间片运行中的状态。当 CPU 时间片用完,会从运行状态转换至可运行状态,会导致线程的上下文切换。
- 阻塞状态:如果调用了阻塞 API,如 BIO 读写文件,这时该线程实际不会用到 CPU,会导致线程上下文切换,进入阻塞状态。等 BIO 操作完毕,会由操作系统唤醒阻塞的线程,转换至可运行状态。与可运行状态的区别是,对阻塞状态的线程来说只要它们一直不唤醒,调度器就一直不会考虑调度它们。
- 终止状态:表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态。
线程的六种状态
线程的六种状态是从 Java API 的层面来考虑的:
- NEW:线程刚被创建,但是还没有调用
start
方法。 - RUNNABLE:当调用了
start
方法之后,注意,Java API 层面的 RUNNABLE 状态涵盖了操作系统层面的 可运行状态、运行状态和阻塞状态(由于 BIO 导致的线程阻塞,在 Java 里无法区分,仍然认为是可运行)。 - BLOCKED、WAITING、TIMED_WAITING:都是 Java API 层面对阻塞状态的细分,后面会在状态转换一节详细讲述。
- TERMINATED:当前线程代码运行结束。
共享模型之管程
共享问题
我们知道,多个线程操作同一个共享变量的时候,有时候会出现线程安全问题。例如如下代码中,两个线程对初始值为 0 的静态变量一个做自增一个做自减,各做 5000 次,实际结果可能不为 0:
private static int counter = 0;
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; ++i) {
// 临界区
++counter;
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; ++i) {
// 临界区
--counter;
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}", counter);
产生以上问题的原因,便是自增和自减的操作在指令码的层面上讲实际上被分成了多步,包括:从内存中读,自增 / 自减,往内存中写。这样,一旦这些步骤中某一步在执行时被打断而后将执行权交给另一个线程,就会导致问题发生。
概念解释:
- 临界区(Critical Section):一段代码块内如果存在对共享资源的多线程读写操作,称这段代码块为临界区。
- 竞态条件(Race Condition):多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件。
synchronized
synchronized (对象锁)是解决上述问题的一种方案。它采用互斥的方式让同一 时刻至多只有一个线程能持有对象锁,其它线程再想获取这个对象锁时就会阻塞住。这样就能保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换。
注意:虽然 java 中互斥和同步都可以采用 synchronized 关键字来完成,但它们还是有区别的:
- 互斥是保证临界区的竞态条件发生,同一时刻只能有一个线程执行临界区代码。
- 同步是由于线程执行的先后、顺序不同、需要一个线程等待其它线程运行到某个点。
我们需要往临界区的代码上加锁:
private static int counter = 0;
private static final Object lock = new Object();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; ++i) {
synchronized (lock) {
++counter;
}
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; ++i) {
synchronized (lock) {
--counter;
}
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}", counter);
synchronized 实际是用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切换所打断。
使用面向对象思想优化
上述的代码对于 counter 的操作实际上还是面向过程,对于保证 counter 自增自减的操作还是需要我们手动加锁。我们不妨把对 counter 的各种操作直接封装成一个类,并对外提供原子性方法:
class MyCounter {
private int counter = 0;
// 封装自增
public void increment() {
synchronized (this) {
++counter;
}
}
// 封装自减
public void decrement() {
synchronized (this) {
--counter;
}
}
// 返回counter时也需要加锁
public int getCounter() {
synchronized (this) {
return counter;
}
}
}
main 方法中即可做出对应修改:
MyCounter counter = new MyCounter();
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; ++i) {
counter.increment();
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; ++i) {
counter.decrement();
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}", counter.getCounter());
方法上的 synchronized
class Test {
public synchronized void test() {
}
}
// 上述等价于
class Test {
public void test() {
synchronized (this) {
}
}
}
class Test {
public synchronized static void test() {
}
}
// 上述等价于
class Test {
public static void test() {
synchronized (Test.class) {
}
}
}
常见的线程安全类
常见的线程安全类有:
- String
- Integer
- StringBuffer
- Random
- Vector
- Hashtable
- java.util.concurrent 包下的类
这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法时,是线程安全的。但是需要注意:多个方法的组合并不是原子的。例如下方的代码就不是原子的:
Hashtable table = new Hashtable();
// 线程1,线程2
if (table.get("key") == null) {
table.put("key", value);
}
sequenceDiagram participant t1 as 线程1 participant t2 as 线程2 participant table t1 ->> table : get("key") == null t2 ->> table : get("key") == null t2 ->> table : put("key", v2) t1 ->> table : put("key", v1)
如上述时序图所述,旧数据 v1 会把新数据 v2 覆盖掉。
线程安全分析
成员变量和静态变量
判断成员变量和静态变量是否线程安全的方法:
- 如果它们没有共享,则线程安全。
- 如果被共享了,根据它们的状态是否能够改变,又分两种情况:
- 如果只有读操作,则线程安全。
- 如果有读写操作,则这段代码是临界区,可能回导致线程安全问题。
局部变量
判断局部变量是否线程安全的方法:
- 局部变量是线程安全的。
- 但局部变量引用的对象则未必:
- 如果该对象没有逃离方法的作用访问,它是线程安全的。
- 如果该对象逃离方法的作用范围,需要考虑线程安全。
例如下述代码:
public static void test1() {
int i = 10;
i++;
}
每个线程调用 test1()
方法时局部变量 i,会在每个线程的栈帧内存中被创建多份,因此不存在共享。
线程安全分析实例
在 Web 应用中,一般会由 Servlet 来处理我们的请求,当然了,这个 Servlet 只有一份,但是要处理的请求却是多个,那么在这种情况下,其内部的成员变量线程是否安全呢?我们来看以下代码:
public class MyServlet extends HttpServlet {
// map线程不安全,因为HashMap不具备线程安全性质
Map<String, Object> map = new HashMap<>();
// s1和s2都是线程安全的,因为String类型是不可变的
String s1 = "...";
final String s2 = "...";
// d1是线程不安全的
Date d1 = new Date();
// d2也是线程不安全的,虽然被final修饰了,但是无法保证d2对象内部的属性不变
final Date d2 = new Date();
}
public class MyServlet extends HttpServlet {
private UserService userSerivce = new UserSerivceImpl();
}
public class UserServiceImpl implements UserService {
// count也是不安全的,在实际开发中应该避免在实现类中写出这种类型的代码
private int count = 0;
public void update() {
count++;
}
}
Monitor 概念
Java 对象头
以 32 位 Java 虚拟机为例,普通对象的 Java 对象头结构有:
|<--------------------Object Header (64 bits)----------------->|
|------------------------------------|-------------------------|
| Mark Word (32 bits) | Klass Word (32 bits) |
|------------------------------------|-------------------------|
数组对象的 Java 对象头结构有:
|<--------------------------------Object Header (96 bits)------------------------>|
|--------------------------------|-----------------------|------------------------|
| Mark Word(32bits) | Klass Word(32bits) | array length(32bits) |
|--------------------------------|-----------------------|------------------------|
其中,Klass Word 记录了实例对象的指针,Mark Word 记录了其他详细的信息。Mark Word 的具体结构为:
|-------------------------------------------------------|--------------------|
| Mark Word (32 bits) | State |
|-------------------------------------------------------|--------------------|
| hashcode:25 | age:4 | biased_lock:0 | 01 | Normal |
|-------------------------------------------------------|--------------------|
| thread:23 | epoch:2 | age:4 | biased_lock:1 | 01 | Biased |
|-------------------------------------------------------|--------------------|
| ptr_to_lock_record:30 | 00 | Lightweight Locked |
|-------------------------------------------------------|--------------------|
| ptr_to_heavyweight_monitor:30 | 10 | Heavyweight Locked |
|-------------------------------------------------------|--------------------|
| | 11 | Marked for GC |
|-------------------------------------------------------|--------------------|
64 位虚拟机 Mark Word 结构:
|---------------------------------------------------------------------|----------|
| Mark Word (64 bits) | State |
|--------------------------------------------------------------------------------|
| unused:25 | hashcode:31 | unused:1 | age:4 | biased_lock:0 | 01 | Normal |
|---------------------------------------------------------------------|----------|
| thread:54 | epoch:2 | unused:1 | age:4 | biased_lock:1 | 01 | Biased |
|---------------------------------------------------------------------|----------|
| ptr_to_lock_record:62 | 00 | Lightweight Locked |
|-----------------------------------------------------------|--------------------|
| ptr_to_heavyweight_monitor:62 | 10 | Heavyweight Locked |
|-----------------------------------------------------------|--------------------|
| | 11 | Marked for GC |
|-----------------------------------------------------------|--------------------|
Monitor 工作原理
Monitor 被翻译为监视器或管程。
每个 Java 对象都可以关联一个 Monitor 对象,如果使用 synchronized 给对象上锁(重量级)之后,该对象头的 Mark Word 中就被设置指向 Monitor 对象的指针。Monitor 对象是操作系统层面的,其内部的成员变量大致可以如下表示:
class Monitor {
private Set<Thread> waitSet;
private List<Thread> entryList;
private Thread owner;
}
假设对临界区代码使用重量级锁进行保护:
- 当一个线程执行一个临界区代码片段时,会先查看关联的锁对象的 Mark Word 字段,通过 Mark Word 字段存储的
ptr_to_heavyweight_monitor
指针找到 Monitor 对象。 - 此时线程会查看 Monitor 对象中 owner 是否为空,如果为空,则对其进行占用,表示该线程已经占有锁。
- 如果线程查看 Monitor 对象中 owner 不为空,则会进入 entryList 进行等待,该线程会被阻塞。
- 当 owner 上的线程执行完代码后,就会唤醒 entryList 中的所有线程,让它们再去争抢 owner 所有权。
- waitSet 存储的是之前获得过锁,但条件不满足进入 WAITING 状态的线程,后面讲 wait-notify 时会分析。
synchronized 进阶
在 Java 中,synchronized
关键字用于实现同步,它可以保证同一时刻只有一个线程执行某个代码块,从而避免并发问题。Java 在实现 synchronized
时采用了多种锁优化策略,以减少线程在获取锁时的性能开销。
JVM 中为了提升锁的性能,引入了三种锁模式:
- 偏向锁(Biased Locking)。
- 轻量级锁(Lightweight Locking)。
- 重量级锁(Heavyweight Locking)。
轻量级锁
轻量级锁的应用场景:如果对于一个临界区虽然有多个线程访问,但是多线程访问的时间是错开的(也就是没有竞争或者竞争较小),那么可以使用轻量级锁来进行优化。
轻量级锁对使用者是透明的,即语法还是 synchronized。
static final Object obj = new Object();
public static void method1() {
// synchronized优先使用轻量级锁,当竞争激烈时自动升级为重量级锁
synchronized (obj) {
method2();
}
}
public static void method2() {
// 调用method2时对象锁为obj,method2执行时对象锁依旧为obj,这种情况叫锁重入
synchronized (obj) {
// ...
}
}
轻量级锁工作流程:
- 当一个线程要来访问被 synchronized 修饰的临界区时,如果加的是轻量级锁,线程会先创建一个 LockRecord 对象,该对象包含一条
lock_record
信息和指向对象锁(例如上述代码中,对象锁为obj
)的指针。 - 线程将
lock_record
与对象锁的 Mark Word(判断为 Normal 后)进行替换(替换的时候使用 cas,compare and swap,保证原子性),将对象锁的 Mark Word 替换为lock_record
信息(Lightweight Locked)。 - 如果 cas 替换成功,表示线程成功拿到轻量级锁。
- 如果 cas 替换失败,有如下两种可能性:
- 其他线程已经持有了对该 Object 的轻量级锁,这个时候表明有竞争,进入锁膨胀过程。
- 替换的时候发现
lock_record
信息来自于自身,这个时候表明进行锁重入,那么该线程会再次创建一个 LockRecord 作为锁重入的计数,只不过这个时候 LockRecord 中的lock_record
信息为 null。
- 当退出临界区代码块时,如果有
lock_record
为 null 的锁记录,表示有重入,这时重置锁记录,表示重入计数减一。 - 当退出临界区代码块时,如果
lock_record
不为 null,这个时候使用 cas 对锁对象 Mark Word 的值进行恢复:- 成功,则解锁成功。
- 失败(判断出 Mark Word 不为 Lightweight Locked),则说明轻量级锁已经进行锁膨胀升级为重量级锁,则这时候进入重量级锁解锁流程。
锁膨胀
如果在尝试加轻量级锁的过程中,CAS 操作无法成功,这时一种情况就是有其它线程为此对象加上了轻量级锁(有 竞争),这时需要进行锁膨胀,将轻量级锁变为重量级锁。
我们假设,线程 t0 已经成功加了一个轻量级锁。而当 t1 进行轻量级加锁,在进行 cas 交换时就会发现对象锁的 Mark Word 字段不是 Normal 而是已经变成 Lightweight Locked。这个时候触发锁膨胀过程:
- t1 会为对象锁(即 obj)申请一个 Monitor 锁,并更换 obj 的 Mark Word 字段为 Heavyweight Locked,其中的
ptr_to_heavyweight_monitor
指向这个申请好的 Monitor。 - 接着,把 Monitor 的 owner 设置为 t0,t1 自己则进入 entryList 中进行阻塞。
通过上述步骤,t1 完成了锁膨胀。而当 t0 想要解锁时,判断 obj 的 Mark Word 已经不为 Lightweight Locked 了,则进入重量级锁解锁流程:
- 读取 Mark Word 的
ptr_to_heavyweight_monitor
字段找到 Monitor。 - 设置 owner 为 null,并唤醒 entryList 中的所有线程,让它们继续对该锁进行竞争。
自旋优化
重量级锁竞争的时候,还可以使用自旋来进行优化(阻塞会导致上下文切换,比较消耗性能):
当一个线程发现已经有别的线程持有重量级锁时,这个时候,该线程不急着进入 entryList 阻塞,而是采用自旋的方式,先轮询几次。
如果轮询的时候,这个锁刚好被解开,则该线程直接获取该重量级锁。
如果轮询了一定次数后,锁还没有解开,这个时候该线程再去进入 entryList 中阻塞。
不过,对于自旋,要注意的是:
- 自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。
- 在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会 高,就多自旋几次;反之,就少自旋甚至不自旋,总之,比较智能。
- Java 7 之后不能控制是否开启自旋功能。
偏向锁
轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行 CAS 操作。
Java 6 中引入了偏向锁来做进一步优化:只有第一次使用 CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现 这个线程 ID 是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个对象就归该线程所有。
偏向状态
一个对象创建时:
- 如果开启了偏向锁(默认开启),那么对象创建后,Mark Word 值为 0x05,即最后 3 位为 101,这时它的 thread、epoch、age 都为 0。
- 偏向锁是默认是延迟的,不会在程序启动时立即生效,如果想避免延迟,可以加 VM 参数
-XX:BiasedLockingStartupDelay=0
来禁用延迟。 - 如果没有开启偏向锁,那么对象创建后,markword 值为 0x01,即最后 3 位为 001,这时它的 hashcode、 age 都为 0,第一次用到 hashcode 时才会赋值。
偏向锁的撤销
当只有一个线程访问临界区时,这个时候对象锁为偏向锁,有三种情况会撤销对象的偏向锁:
- 调用对象的
hashCode
方法,偏向锁区别于轻量级锁和重量级锁,它没有对对象 HashCode 的保存,所以当调用对象的hashCode
方法时,MarkWord 字段会被覆盖为 Normal,目的是为了给 HashCode 留下存储空间。 - 当有其它线程使用偏向锁对象时,会将偏向锁撤销,升级为轻量级锁。
- 调用
wait/notify
时,因为该机制只有重量级锁才有,所以也会导致偏向锁的撤销。
批量重偏向
对于同一个对象的多个实例,很有可能都会被当成锁来处理。假设 A 类有实例对象 10 个,t1 线程遍历这 10 个对象使用它们进行上锁,这个时候,10 个对象都会偏向 t1 加锁。
这个时候,来了一个 t2 线程,同样遍历这 10 个对象,并使用它们上锁,这个时候,10 个对象上面的偏向锁就会依次被撤销,形成轻量级锁。
假设 A 类的实例对象不止有 10 个(假设有好多好多个),每一个对象都被上述的操作所影响,进而导致偏向锁的撤销。当撤销的次数超过 20 次的时候,jvm 会认为无需再用轻量级锁,可以使用偏向锁,就不会撤销偏向锁,而是将剩下的偏向锁偏向的对象更改为 t2。
批量撤销
当撤销偏向锁阈值超过 40 次后,jvm 会这样觉得根本就不该偏向。于是整个类的所有对象都会变为不可偏向的,新建的对象也是不可偏向的。
锁消除
JVM 中有即时编译器 JIT,其目的是为了对热点代码做进一步的优化,如果一个代码临界区被判断为不会产生线程安全问题(即不会逃离作用域),那么就算我们有偏向锁优化,JIT 也会直接认为不需要加锁,进而把 synchronized 直接去掉。
wait notify
基本概念
wait notify 用于处理线程需要额外处理条件的情况:
- 在重量级锁中,owner 线程发现执行代码的条件不满足,需要别的线程先得到结果,但是二者同时都使用同一把锁,这个时候 owner 会调用 wait 方法,进入 waitSet 中变为 WAITING 状态。
- BLOCKED 和 WAITING 状态都属于阻塞,不占用 CPU 时间片。
- BLOCKED 线程会在 owner 线程释放锁时被唤醒。
- WAITING 线程会在 owner 线程调用 notify 或者 notifyAll 时唤醒,旦唤醒后不意味着立刻获得锁,仍需进入 entryList 中重新竞争。
API 介绍
wait()
:让进入 object 监视器的线程到 waitSet 等待。wait(long n)
:有时限的等待, 到 n 毫秒后结束等待,或是被 notify。notify()
:在 object 上正在 waitSet 等待的线程中挑一个唤醒。notifyAll()
:让 object 上正在 waitSet 等待的线程全部唤醒。
它们都是线程之间进行协作的手段,都属于同一个对象的方法。必须获得此对象的锁,才能调用这几个方法。
// 锁对象
private final static Object obj = new Object();
public static void main(String[] args) throws Exception {
new Thread(() -> {
synchronized (obj) {
log.debug("执行...");
try {
obj.wait(); // 让线程在obj上一直等待下去
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其他代码...");
}
}).start();
new Thread(() -> {
synchronized (obj) {
log.debug("执行...");
try {
obj.wait(); // 让线程在obj上一直等待下去
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("其他代码...");
}
}).start();
// 主线程两秒后执行
Thread.sleep(2000);
log.debug("唤醒obj上的其他线程");
// 需要同样在obj对象锁Monitor里才能唤醒对应的线程,否则唤醒无效
synchronized (obj) {
obj.notify(); // 唤醒obj上一个线程
// obj.notifyAll(); // 唤醒obj上所有线程
}
}
wait 和 sleep 的区别
- sleep 是 Thread 方法,而 wait 是 Object 的方法。
- sleep 不需要强制和 synchronized 配合使用,但 wait 需要 和 synchronized 一起用。
- sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁。
- 它们都会把线程的状态转换为 TIMED_WAITING。
wait notify 的正确使用方法
step1
我们模拟一个线程需要等待一个结果,而后对这个结果进行处理的场景。先来看如下代码:
// 锁对象
private final static Object obj = new Object();
// 是否获得结果
private static boolean hasResult = false;
public static void main(String[] args) throws Exception {
new Thread(() -> {
synchronized (obj) {
log.debug("查询是否有条件结果: {}", hasResult);
if (!hasResult) {
log.debug("无条件结果,等待结果...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("查询是否有条件结果: {}", hasResult);
if (hasResult) {
log.debug("处理结果...");
}
}
}, "t1").start();
for (int i = 0; i < 5; ++i) {
new Thread(() -> {
synchronized (obj) {
log.debug("其他线程操作...");
}
}).start();
}
Thread.sleep(1000);
new Thread(() -> {
// 这里不能加锁,否则结果无法正确处理
hasResult = true;
log.debug("结果处理完毕");
}, "t2").start();
}
通过简单的分析,上述代码有以下缺点:
- 其它干活的线程,都要一直阻塞,效率太低。
- 小南线程必须睡足 2s 后才能醒来,就算结果提前送到,也无法立刻醒来。
- 加了 synchronized 后,线程睡眠并不会释放锁。
我们尝试使用 wait notify 解决。
step2
针对上述问题,我们更改出如下代码:
// 锁对象
private final static Object obj = new Object();
// 是否获得结果
private static boolean hasResult = false;
public static void main(String[] args) throws Exception {
new Thread(() -> {
synchronized (obj) {
log.debug("查询是否有条件结果: {}", hasResult);
if (!hasResult) {
log.debug("无条件结果,等待结果...");
try {
// 使用wait等待
obj.wait(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("查询是否有条件结果: {}", hasResult);
if (hasResult) {
log.debug("处理结果...");
}
}
}, "t1").start();
for (int i = 0; i < 5; ++i) {
new Thread(() -> {
synchronized (obj) {
log.debug("其他线程操作...");
}
}).start();
}
Thread.sleep(1000);
new Thread(() -> {
synchronized (obj) {
hasResult = true;
log.debug("结果处理完毕");
obj.notify();
}
}, "t2").start();
}
可以看到,上述代码解决了等待结果时其他线程阻塞运行的情况。但是,如果有其他线程也在等待,该怎么办呢?
step3
如下代码演示了多个线程等待的情况:
// 锁对象
private final static Object obj = new Object();
private static boolean hasResult1 = false;
private static boolean hasResult2 = false;
public static void main(String[] args) throws Exception {
new Thread(() -> {
synchronized (obj) {
log.debug("查询是否有条件结果1: {}", hasResult1);
if (!hasResult1) {
log.debug("无条件结果1,等待结果1...");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("查询是否有条件结果1: {}", hasResult1);
if (hasResult1) {
log.debug("处理结果1...");
} else {
log.debug("无法处理结果1");
}
}
}, "t1").start();
new Thread(() -> {
synchronized (obj) {
log.debug("查询是否有条件结果2: {}", hasResult2);
if (!hasResult2) {
log.debug("无条件结果2,等待结果2...");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("查询是否有条件结果2: {}", hasResult2);
if (hasResult2) {
log.debug("处理结果2...");
} else {
log.debug("无法处理结果2");
}
}
}, "t2").start();
Thread.sleep(1000);
new Thread(() -> {
synchronized (obj) {
hasResult2 = true;
log.debug("结果处理完毕");
obj.notify();
}
}, "t3").start();
}
实际运行后发现,有于线程顺序的原因,导致 t3 线程本来应该唤醒 t2 线程,但却意外唤醒了 t1 线程,导致 t2 线程一直处于等待状态中无法执行接下来的操作。
上述的错误操作我们称为虚假唤醒。
step4
我们把 notify 方法更改为 notifyAll:
new Thread(() -> {
synchronized (obj) {
hasResult2 = true;
log.debug("结果处理完毕");
obj.notifyAll();
}
}, "t3").start();
运行结果正常。不过,代码依旧有缺点:
- 用 notifyAll 仅解决某个线程的唤醒问题,但使用 if + wait 判断仅有一次机会,一旦条件不成立,就没有重新 判断的机会了。
我们尝试使用 while + wait 的方法来判断,当条件不成立,即可再次 wait。
step5
结合上述分析,更改代码如下:
// 锁对象
private final static Object obj = new Object();
private static boolean hasResult1 = false;
private static boolean hasResult2 = false;
public static void main(String[] args) throws Exception {
new Thread(() -> {
synchronized (obj) {
log.debug("查询是否有条件结果1: {}", hasResult1);
while (!hasResult1) {
log.debug("无条件结果1,等待结果1...");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("查询是否有条件结果1: {}", hasResult1);
if (hasResult1) {
log.debug("处理结果1...");
} else {
log.debug("无法处理结果1");
}
}
}, "t1").start();
new Thread(() -> {
synchronized (obj) {
log.debug("查询是否有条件结果2: {}", hasResult2);
while (!hasResult2) {
log.debug("无条件结果2,等待结果2...");
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("查询是否有条件结果2: {}", hasResult2);
if (hasResult2) {
log.debug("处理结果2...");
} else {
log.debug("无法处理结果2");
}
}
}, "t2").start();
Thread.sleep(1000);
new Thread(() -> {
synchronized (obj) {
hasResult2 = true;
log.debug("结果处理完毕");
obj.notifyAll();
}
}, "t3").start();
}
以上便是最终完整且正确的代码。
总结
正确的 wait notify 的写法模板如下:
synchronized (lock) {
while (条件不成立) {
lock.wait();
}
// 执行业务操作
}
// 另一个线程
synchronized (lock) {
lock.notifyAll();
}
同步模式——保护性暂停
保护性暂停,即 Guarded Suspension,用在一个线程等待另一个线程的执行结果。jdk 当中的 Future 就是使用保护性暂停实现的。
要点:
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject。
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)。
- JDK 中,join 的实现、Future 的实现,采用的就是此模式。
- 因为要等待另一方的结果,因此归类到同步模式。
实现
结合 wait 和 notifyAll 实现保护性暂停:
class GuardedObject {
private Object response;
private final Object lock = new Object();
public Object get() {
synchronized (lock) {
// 条件不满足则等待
while (response == null) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
lock.notifyAll();
}
}
}
@Slf4j
public class JUCTest {
public static void main(String[] args) throws Exception {
GuardedObject guardedObject = new GuardedObject();
new Thread(() -> {
// 子线程得到response
try {
Thread.sleep(2000);
Object response = new Object();
log.debug("response complete...");
guardedObject.complete(response);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
log.debug("waiting...");
// 主线程阻塞等待
Object response = guardedObject.get();
log.debug("get response: {}", response);
}
}
超时处理
注意考虑虚假唤醒的情况:
@Slf4j
class GuardedObject {
private Object response;
private final Object lock = new Object();
// timeout表示要等待的时间
public Object get(long timeout) {
synchronized (lock) {
// 1.记录最初时间
long begin = System.currentTimeMillis();
// 2.已经经历的时间
long timePassed = 0;
while (response == null) {
// 4.经历了循环后,wait里面的时间要减少
long waitTime = timeout - timePassed;
if (waitTime <= 0) {
log.debug("break...");
break;
}
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3.提前被唤醒或者虚假唤醒
timePassed = System.currentTimeMillis() - begin;
log.debug("timePassed: {}, object is null {}", timePassed, response == null);
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
lock.notifyAll();
}
}
}
join 的原理
join 的底层便是采用我们刚刚介绍的保护性暂停的思想。
public final synchronized void join(final long millis) throws InterruptedException {
if (millis > 0) {
if (isAlive()) {
final long startTime = System.nanoTime();
long delay = millis;
do {
wait(delay);
} while (isAlive() && (delay = millis -
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) > 0);
}
} else if (millis == 0) {
// 条件不满足时wait
while (isAlive()) {
wait(0);
}
} else {
throw new IllegalArgumentException("timeout value is negative");
}
}
多任务
如果有多个线程要来等待结果,多个线程要来处理结果,直接创建多个 GuardedObject 十分不方便。因此我们可以设计一个用来解耦的中间类,这样不仅能够解耦结果等待者和结果生产者,还能够同时支持多个任务的管理。
对 GuardedObject 添加 id 进行标识:
@Slf4j
class GuardedObject {
// id编号,用于在集合中做标识
@Getter
private int id;
public GuardedObject(int id) {
this.id = id;
}
private Object response;
private final Object lock = new Object();
// timeout表示要等待的时间
public Object get(long timeout) {
// ...
}
public void complete(Object response) {
// ...
}
}
编写中间类,进行解耦:
class Futures {
// 使用Hashtable,保证线程安全
private static Map<Integer, GuardedObject> guardedObjects = new Hashtable<>();
private static int id = 1;
// 产生唯一id
private static synchronized int generateId() {
return id++;
}
// 下述两个方法因为是直接操作Hashtable,具有线程安全性,所以不用手动加synchronized
public static GuardedObject createGuardedObject() {
GuardedObject guardedObject = new GuardedObject(generateId());
guardedObjects.put(guardedObject.getId(), guardedObject);
return guardedObject;
}
public static GuardedObject getGuardedObject(int id) {
// 结果返回后,GuardedObject可以舍去
return guardedObjects.remove(id);
}
public static Set<Integer> getAllIds() {
return guardedObjects.keySet();
}
}
编写生产者和消费者,模拟生成结果和消费结果:
@Slf4j
class People extends Thread {
@Override
public void run() {
GuardedObject guardedObject = Futures.createGuardedObject();
log.debug("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.debug("收到信 id:{}, 内容: {}", guardedObject.getId(), mail);
}
}
@Slf4j
class Postman extends Thread {
private int id;
private String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Futures.getGuardedObject(id);
log.debug("送信 id:{}, 内容: {}", id, mail);
guardedObject.complete(mail);
}
}
测试:
public static void main(String[] args) throws Exception {
for (int i = 0; i < 3; ++i) {
// 等待结果
new People().start();
}
Thread.sleep(1000);
// 获取所有编号,好让我们知道有哪些信要送
Set<Integer> allIds = Futures.getAllIds();
allIds.forEach(id -> {
// 一个消费者对应一个生产者
new Postman(id, "内容" + id).start();
});
}
与生产者消费者模式区别的是:保护性暂停中,消费者与生产者是一一对应的,也就是说,一个消费者有专门的一个生产者为其生产结果。
异步模式——生产者消费者
要点:
- 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应。
- 消费队列可以用来平衡生产和消费的线程资源。
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据。
- 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据。
- JDK 中各种阻塞队列,采用的就是这种模式。
实现
消息类:
@Getter
@AllArgsConstructor
final class Message {
private int id;
private Object value;
}
消息队列:
class MessageQueue {
// 双向队列,用于从一头放消息,另一头取消息
private LinkedList<Message> list = new LinkedList<>();
// 队列容量
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
}
// 获取消息
public Message take() {
synchronized (list) {
// 为空时,等待
while (list.isEmpty()) {
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 返回消息,并唤醒生产者来生产
Message message = list.removeFirst();
list.notifyAll();
return message;
}
}
// 存入消息
public void put(Message message) {
synchronized (list) {
// 检查队列是否满了
while (list.size() == capacity) {
try {
// 库存已达上限,不能再放入了
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 加消息,并通知消费者消费
list.addLast(message);
list.notifyAll();
}
}
}
测试:
public static void main(String[] args) throws Exception {
// 消息队列
MessageQueue messageQueue = new MessageQueue(2);
// 三个生产者线程
for (int i = 0; i < 3; ++i) {
int id = i;
new Thread(() -> {
try {
Thread.sleep(1000);
log.debug("put message id:{}", id);
messageQueue.put(new Message(id, new Object()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "生产者" + i).start();
}
// 一个消费者线程
new Thread(() -> {
while (true) {
try {
Thread.sleep(500);
Message message = messageQueue.take();
log.debug("takeMessage id:{}, value: {}", message.getId(), message.getValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "消费者").start();
}
park unpark
基本使用
它们是 LockSupport 类中的方法:
// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark(thread);
Thread t1 = new Thread(() -> {
log.debug("start...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("park...");
LockSupport.park();
log.debug("resume...");
}, "t1");
t1.start();
Thread.sleep(2000);
log.debug("unpark...");
LockSupport.unpark(t1);
值得注意的是,如果先 unpark 再 park 的话,同样也可以让线程正常运行而不会阻塞住。
与 Object 的 wait & notify 相比,park unpark 的特点:
- wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必。
- park unpark 是以线程为单位来阻塞和唤醒线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么精确。
- park & unpark 可以先 unpark,而 wait & notify 不能先 notify。
原理
park unpark 是基于信号量来实现的。
每个线程都有自己的一个 Parker 对象,由三部分组成 _counter
,_cond
和 _mutex
。
- 调用 park 方法时:
- 检查
_counter
,如果为 1,获得_mutex
互斥锁。 - 如果为 0,线程进入
_cond
条件变量阻塞。 - 为 1 时,线程正常运行,旦设置
_counter
为 0。
- 检查
- 调用 unpark 方法时:
- 唤醒
_cond
条件变量中的阻塞线程。 - 设置
_counter
为 1。
- 唤醒
线程的状态转换
我们假设有线程 Thread t
。
NEW -> RUNNABLE
当调用 t.start()
方法时,由 NEW 转换为 RUNNABLE。
RUNNABLE <-> WAITING
t 线程用 synchronized(obj)
获取了对象锁后:
- 调用
obj.wait()
方法时,t 线程从 RUNNABLE 转换为 WAITING。 - 调用
obj.notify()
,obj.notifyAll()
,t.interrupt()
时:- 竞争锁成功,t 线程从 WAITING 转换为 RUNNABLE。
- 竞争锁失败,t 线程从 WAITING 转换为 BLOCKED。
RUNNABLE <-> WAITING
- 当前线程调用
t.join()
方法时,当前线程从 RUNNABLE 转换为 WAITING。(注意是当前线程在 t 线程对象的监视器上等待) - t 线程运行结束,或调用了当前线程的
interrupt()
时,当前线程从 WAITING 转换为 RUNNABLE。
RUNNABLE <-> WAITING
- 当前线程调用
LockSupport.park()
方法时,当前线程从 RUNNABLE 转换为 WAITING。 - 调用
LockSupport.unpark(目标线程)
或调用了线程的interrupt()
,会让目标线程从 WAITING 转换为 RUNNABLE。
RUNNABLE <-> TIMED_WAITING
t 线程用 synchronized(obj)
获取了对象锁后:
- 调用
obj.wait(long n)
方法时,t 线程从 RUNNABLE 转换为 TIMED_WAITING。 - t 线程等待时间超过 n 毫秒,或者调用
obj.notify()
,obj.notifyAll()
,t.interrupt()
时:- 竞争锁成功,t 线程从 TIMED_WAITING 转换为 RUNNABLE。
- 竞争锁失败,t 线程从 TIMED_WAITING 转换为 BLOCKED。
RUNNABLE <-> TIMED_WAITING
- 当前线程调用
t.join(long n)
方法时,当前线程从 RUNNABLE 转换为 TIMED_WAITING。(注意是当前线程在 t 线程对象的监视器上等待) - 当前线程等待时间超过了 n 毫秒,或 t 线程运行结束,或调用了当前线程的
interrupt()
时,当前线程从 TIMED_WAITING 转换为 RUNNABLE。
RUNNABLE <-> TIMED_WAITINIG
- 当前线程调用
Thread.sleep(long n)
,当前线程从 RUNNABLE 转换为 TIMED_WAITING。 - 当前线程等待时间超过 n 毫秒,当前线程从 TIMED_WAITING 转换为 RUNNABLE。
RUNNABLE <-> TIMED_WAITING
- 当前线程调用
LockSupport.parkNanos(long nanos)
或LockSupport.parkUntil(long millis)
时,当前线程从 RUNNABLE 转换为 TIMED_WAITING。 - 调用
LockSupport.unpark(目标线程)
或调用了线程的interrupt()
,或是等待超时,会让目标线程从 TIMED_WAITING 转换为 TIMED_WAITING。
RUNNABLE <-> BLOCKED
- t 线程用
synchronized(obj)
获取了对象锁时如果竞争失败,从 RUNNABLE 转化为 BLOCKED。 - 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争成功,从 BLOCKED 转换为 RUNNABLE,其他失败的线程仍然 BLOCKED。
RUNNABLE <-> TERMINATED
当前线程所有代码运行完毕,进入 TERMINATED。
多把锁
如果有多个任务要保证原子性,但是它们又共享同一把锁,此时并发度是很低的:
class Lock {
public void method1() {
synchronized (this) {
// ...
}
}
public void method2() {
synchronized (this) {
// ...
}
}
}
我们可以在 Lock 中增加其他的锁对象,来提高并发度:
class Lock {
private final Object lock1 = new Object();
private final Object lock2 = new Object();
public void method1() {
synchronized (lock1) {
// ...
}
}
public void method2() {
synchronized (lock2) {
// ...
}
}
}
也就是说,可以将锁的粒度细分,以提高并发度。不过,如果一个线程需要同时获得多把锁,就容易发生死锁。
活跃性
死锁
有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁。
t1 线程获得 A 对象锁,接下来想获取 B 对象锁。t2 线程获得 B 对象锁,接下来想获取 A 对象锁:
private final static Object A = new Object();
private final static Object B = new Object();
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(() -> {
synchronized (A) {
log.debug("lock A");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (B) {
log.debug("lock B");
}
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (B) {
log.debug("lock B");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (A) {
log.debug("lock A");
}
}
}, "t2");
t1.start();
t2.start();
}
检测死锁可以用 jconsole 工具,或者使用 jps 定位进程 id,再用 jstack 定位死锁。
活锁
活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束:
new Thread(() -> {
// 期望减到0退出循环
while (count > 0) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
--count;
log.debug("count:{}", count);
}
}, "t1").start();
new Thread(() -> {
// 期望超过20退出循环
while (count < 20) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
++count;
log.debug("count:{}", count);
}
}, "t2").start();
死锁的两个线程都阻塞了,但是活锁的两个线程却一直在占用 cpu 资源。
饥饿
很多教程中把饥饿定义为,一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束,饥饿的情况不 易演示,讲读写锁时会涉及饥饿问题。
ReentrantLock
ReentrantLock(可重入锁),属于 juc 并发工具包下的重要类。相对于 synchronized 具备以下特点:
- 可中断。
- 可以设置超时时间。
- 可以设置为公平锁。
- 可以支持多个条件变量。
- 与 synchronized 一样,也支持重入。
基本语法:
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁。如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws Exception {
method1();
}
public static void method1() {
lock.lock();
try {
log.debug("execute method1");
method2();
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock();
try {
log.debug("execute method2");
method3();
} finally {
lock.unlock();
}
}
public static void method3() {
lock.lock();
try {
log.debug("execute method3");
} finally {
lock.unlock();
}
}
可打断
可打断的意思是,当一个线程正在等待锁释放的时候,将其打断:
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
// 获取可以被打断的锁
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("等锁的过程中被打断");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
Thread.sleep(1000);
t1.interrupt();
log.debug("执行打断");
} finally {
lock.unlock();
}
}
锁超时
锁立刻失败的情况,主动地避免锁死等:
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(() -> {
log.debug("启动...");
if (!lock.tryLock()) {
log.debug("获取立刻失败,返回");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
Thread.sleep(2000);
} finally {
lock.unlock();
}
}
锁设置超时时间的情况,在这段时间内,会尝试加锁,如果超时后锁都加不上,则放弃加锁:
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("获取立刻失败,返回");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
Thread.sleep(2000);
} finally {
lock.unlock();
}
}
公平锁
不公平锁,即当一个锁被占用,其他的线程被迫等待后,锁释放的那一瞬间,所有等待的线程一拥而上,谁先抢到锁的使用权,谁就能先占用锁,而不是先来先得。
ReentrantLock 默认是不公平的,可以使用如下方法改为公平锁:
private static ReentrantLock lock = new ReentrantLock(true);
公平锁一般是没有比要的,会降低并发度,后续的原理中会讲到。
条件变量
synchronized 中也有条件变量,就是我们讲原理时那个 waitSet,当条件不满足时进入 waitSet 等待。
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比:
- synchronized 是那些不满足条件的线程都在一个 waitSet 等消息。
- 而 ReentrantLock 支持多个条件变量,对功能进行了细分。
使用要点:
- await 前需要获得锁。
- await 执行后,会释放锁,进入 conditionObject 等待。
- await 的线程被唤醒(或打断、或超时)去重新竞争 lock 锁。
- 竞争 lock 锁成功后,从 await 后继续执行。
private static ReentrantLock lock = new ReentrantLock();
private static boolean hasResult = false;
public static void main(String[] args) throws Exception {
Condition condition = lock.newCondition();
new Thread(() -> {
try {
lock.lock();
while (!hasResult) {
try {
condition.await(); // 在指定的条件变量上等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到结果");
} finally {
lock.unlock();
}
}).start();
Thread.sleep(1000);
sendResult(condition);
}
private static void sendResult(Condition condition) {
// 得在同一个锁里面才能正确唤醒,不加锁唤醒不了,会抛异常
lock.lock();
try {
log.debug("生产结果");
hasResult = true;
condition.signal();
} finally {
lock.unlock();
}
}
同步模式——固定顺序执行
有些时候,我们需要固定某些操作的执行顺序,这个时候,可以采用加锁的方式来实现(比如我们先打印 2 后打印 1):
wait notify
// 锁对象
private static final Object lock = new Object();
// 表示t2是否运行过
private static boolean isT2Runned = false;
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(() -> {
synchronized (lock) {
while (!isT2Runned) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("1");
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (lock) {
log.debug("2");
isT2Runned = true;
lock.notifyAll();
}
}, "t2");
t1.start();
t2.start();
}
ReentrantLock 的实现方法和上述差不多,这里不再赘述。
park unpark
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(() -> {
LockSupport.park();
log.debug("1");
}, "t1");
Thread t2 = new Thread(() -> {
log.debug("2");
LockSupport.unpark(t1);
}, "t2");
t1.start();
t2.start();
}
同步模式——交替输出
假设线程 1 会输出 a 五次,线程 2 会输出 b 五次,线程 3 会输出 c 五次。现在要求输出 abcabcabc… 怎么实现?
wait notify
@Slf4j
public class JUCTest {
public static void main(String[] args) throws Exception {
WaitNotify waitNotify = new WaitNotify(1, 5);
// 三个线程同时操作一个对象
new Thread(() -> {
waitNotify.print("a", 1, 2);
}, "t1").start();
new Thread(() -> {
waitNotify.print("b", 2, 3);
}, "t2").start();
new Thread(() -> {
waitNotify.print("c", 3, 1);
}, "t3").start();
}
}
@Slf4j
@AllArgsConstructor
class WaitNotify {
private int flag; // 等待标记
private int loopNumber; // 循环次数
public void print(String msg, int waitFlag, int nextFlag) {
for (int i = 0; i < loopNumber; i++) {
synchronized (this) {
while (this.flag != waitFlag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug(msg);
flag = nextFlag;
this.notifyAll();
}
}
}
}
await signal
@Slf4j
public class JUCTest {
public static void main(String[] args) throws Exception {
AwaitSignal awaitSignal = new AwaitSignal(5);
Condition a = awaitSignal.newCondition();
Condition b = awaitSignal.newCondition();
Condition c = awaitSignal.newCondition();
new Thread(() -> {
awaitSignal.print("a", a, b);
}, "t1").start();
new Thread(() -> {
awaitSignal.print("b", b, c);
}, "t2").start();
new Thread(() -> {
awaitSignal.print("c", c, a);
}, "t3").start();
// 主线程先把a唤醒
Thread.sleep(1000);
awaitSignal.lock();
try {
a.signal();
} finally {
awaitSignal.unlock();
}
}
}
@Slf4j
@AllArgsConstructor
class AwaitSignal extends ReentrantLock {
private int loopNumber;
public void print(String msg, Condition current, Condition next) {
for (int i = 0; i < loopNumber; i++) {
lock();
try {
current.await();
log.debug(msg);
next.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
unlock();
}
}
}
}
park unpark
@Slf4j
public class JUCTest {
private static Thread t1;
private static Thread t2;
private static Thread t3;
public static void main(String[] args) throws Exception {
ParkUnpark parkUnpark = new ParkUnpark(5);
t1 = new Thread(() -> {
parkUnpark.print("a", t2);
}, "t1");
t2 = new Thread(() -> {
parkUnpark.print("b", t3);
}, "t2");
t3 = new Thread(() -> {
parkUnpark.print("c", t1);
}, "t3");
t1.start();
t2.start();
t3.start();
LockSupport.unpark(t1);
}
}
@Slf4j
@AllArgsConstructor
class ParkUnpark {
private int loopNumber;
public void print(String msg, Thread next) {
for (int i = 0; i < loopNumber; i++) {
LockSupport.park();
log.debug(msg);
LockSupport.unpark(next);
}
}
}
共享模型之内存
上一章讲解的 Monitor 主要关注的是访问共享变量时,保证临界区代码的原子性。这一章我们进一步深入学习共享变量在多线程间的可见性问题与多条指令执行时的有序性问题。
Java 内存模型
JMM 即 Java Memory Model,它定义了主存、工作内存抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、 CPU 指令优化等。
JMM 体现在以下几个方面:
- 原子性:保证指令不会受到线程上下文切换的影响。
- 可见性:保证指令不会受 cpu 缓存的影响。
- 有序性:保证指令不会受 cpu 指令并行优化的影响。
可见性
退不出的循环
先来看一个现象,main 线程对 run 变量的修改对于 t 线程不可见,导致了 t 线程无法停止:
private static boolean run = true;
public static void main(String[] args) throws Exception {
new Thread(() -> {
while (run) {
// ...
}
}, "t").start();
Thread.sleep(1000);
log.debug("程序尝试停止...");
run = false; // 线程t不会如预想的一样停下来
}
导致上面的现象出现的原因是:
- 初始状态,t 线程刚开始从主内存读取了 run 值到自己的工作内存中。
- 因为 t 线程要频繁从主内存中读取 run 的值,JIT 编译器会将 run 的值缓存至自己工作内存中的高速缓存中, 减少对主存中 run 的访问,提高效率。
- 1 秒之后,main 线程修改了 run 的值,并同步至主存,而 t 是从自己工作内存中的高速缓存中读取这个变量 的值,结果永远是旧值,自然无法停止。
上述描述的就是不可见问题,一个线程对变量的修改,对于另一个线程不可见。
解决方法
不可见问题的解决也很简单,在变量前加上 volatile 关键字即可。它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存:
private static volatile boolean run = true;
虽然在效率上有所损失,但是保证了可见性。
可见性 vs 原子性
前面例子体现的实际就是可见性,它保证的是在多个线程之间,一个线程对 volatile 变量的修改对另一个线程可见, 不能保证原子性,仅用在一个写线程,多个读线程的情况。
需要注意的是:synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性(前提是变量完全被其保护起来,否则还是会导致指令重排)。但缺点是 synchronized 是属于重量级操作,性能相对更低。
如果在前面示例的死循环中加入 System.out.println()
会发现即使不加 volatile 修饰符,线程 t 也能正确看到 对 run 变量的修改了。原因是,在 System.out.println()
的底层代码中,使用到了 synchronized,保证了可见性。
终止模式——两阶段终止
我们在之前已经介绍过了使用中断来实现两阶段终止,其实,使用 volatile 也可以实现两阶段终止:
@Slf4j
public class JUCTest {
public static void main(String[] args) throws Exception {
TwoPhaseTermination twoPhaseTermination = new TwoPhaseTermination();
// 启动监控线程
twoPhaseTermination.start();
// 主线程等待3s后关闭监控线程
Thread.sleep(3500);
twoPhaseTermination.stop();
}
}
@Slf4j
class TwoPhaseTermination {
private Thread monitor;
// 停止标记,当stop为真,表示线程应该停止了
private volatile boolean stop;
// 启动监控线程
public void start() {
monitor = new Thread(() -> {
while (true) {
// 频繁读取stop变量,需要用volatile保证可见性
if (stop) {
log.debug("善后操作...");
break;
}
try {
Thread.sleep(1000);
log.debug("执行监控记录");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
monitor.start();
}
// 停止监控线程
public void stop() {
stop = true;
}
}
同步模式——犹豫模式
Balking (犹豫)模式用在一个线程发现另一个线程或本线程已经做了某一件相同的事,那么本线程就无需再做 了,直接结束返回。
上述的监控代码还有一点小问题,就是当我们重复调用 twoPhaseTermination.start()
的时候,会不断创建监控线程出来,实际上是没有必要的,监控线程只需要一个即可,我们可以利用一个变量来解决这个问题:
// starting为真,表示已经有监控线程了
private boolean starting = false;
// 启动监控线程
public void start() {
// 保证多线程安全
synchronized (this) {
if (starting) {
return;
}
starting = true;
}
monitor = new Thread(() -> {
// ...
});
monitor.start();
}
有序性
JVM 会在不影响正确性的前提下,可以调整语句的执行顺序,思考下面一段代码:
static int i;
static int j;
// 在某个线程内执行如下赋值操作
i = ...;
j = ...;
可以看到,至于是先执行 i 还是 先执行 j ,对最终的结果不会产生影响。这种特性称之为指令重排,多线程下指令重排会影响正确性。
为什么要有重排指令这项优化呢?从 CPU 执行指令的原理来理解一下吧。
指令重排
计算机系统中我们已学过:每条指令都可以分为: 取指令(IF) - 指令译码(ID) - 执行指令(EX) - 内存访问(MEM) - 数据写回(WB),这 5 个阶段。在不改变程序结果的前提下,这些指令的各个阶段可以通过重排序和组合来实现指令级并行。指令重排的前提是,重排指令不能影响结果。
使用 volatile 修饰变量,可以禁止重排序。
volatile 原理
volatile 的底层实现原理是内存屏障,Memory Barrier(Memory Fence):
- 对 volatile 变量的写指令后会加入写屏障。
- 对 volatile 变量的读指令前会加入读屏障。
如何保证可见性
写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中:
public void actor2(I_Result r) {
num = 2;
ready = true; // ready是volatile赋值,带写屏障
// 写屏障,上述的ready和num变量都会被同步
}
而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据:
public void actor1(I_Result r) {
// 读屏障,下述的ready和num变量都会被同步
// ready是volatile读,带读屏障
if (ready) {
r.r1 = num + num;
} else {
r.r1 = 1;
}
}
如何保证有序性
写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后:
public void actor2(I_Result r) {
num = 2;
ready = true; // ready是volatile赋值,带写屏障
// 写屏障,上述的代码不会被重排到后面
}
读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前:
public void actor1(I_Result r) {
// 读屏障,下述的代码不会被重排到前面
// ready是volatile读,带读屏障
if (ready) {
r.r1 = num + num;
} else {
r.r1 = 1;
}
}
dcl 问题
double-checked locking,双重检查锁,是单例模式中保证多线程安全的一种手段。一般的单例模式,为了防止造成线程安全问题,需要对整个方法加锁:
public final class Singleton {
private Singleton() {}
private static Singleton instance = null;
// 对整个方法上锁
public static synchronized Singleton getInstance() {
if (instance == null) {
instance = new Singleton();
}
return instance;
}
}
为了让竞争减少,我们会使用双重检查锁来进一步优化:
public final class Singleton {
private Singleton() {}
private static Singleton instance = null;
public static Singleton getInstance() {
// 首次访问直接同步,如果有实例就直接返回,避免了上锁
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}
这段代码看似已经没问题了,但实际上,针对 instance = new Singleton();
这一行代码,jvm 的激进优化可能导致指令的重排。一般顺序是先把对象创建出来,调用构造方法后赋值给 instance。但是,在指令重排后,有可能对象一创建出来就先赋值给了 instance,再去调用构造方法。上述的指令重排就会导致线程可能拿到一个没有完全实例化完毕的对象。
要解决上述问题也非常简单,对 instance 使用 volatile 修饰即可,可以禁用指令重排。但要注意在 JDK 5 以上的版本的 volatile 才会真正有效:
public final class Singleton {
private Singleton() {}
private static volatile Singleton instance = null;
public static Singleton getInstance() {
// 读屏障-------------------------------------
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
// 写屏障-------------------------
}
}
}
return instance;
}
}
如上述所示,读屏障和写屏障共同保护了赋值语句不会被指令重排,成功解决了该问题。
happens-before
happens-before 规定了对共享变量的写操作对其它线程的读操作可见,它是可见性与有序性的一套规则总结,抛开以下 happens-before 规则,JMM 并不能保证一个线程对共享变量的写,对于其它线程对该共享变量的读可见。
线程解锁 m 之前对变量的写,对于接下来对 m 加锁的其它线程对该变量的读可见:
static int x; static Object m = new Object(); new Thread(() -> { synchronized (m) { x = 10; } }, "t1").start(); new Thread(() -> { synchronized (m) { System.out.println(x); } }, "t2").start();
线程对 volatile 变量的写,对接下来其它线程对该变量的读可见:
volatile static int x; new Thread(() -> { x = 10; }, "t1").start(); new Thread(() -> { System.out.println(x); }, "t2").start();
线程 start 前对变量的写,对该线程开始后对该变量的读可见:
static int x; x = 10; new Thread(() -> { System.out.println(x); }, "t2").start();
线程结束前对变量的写,对其它线程得知它结束后的读可见(比如其它线程调用
t1.isAlive()
或t1.join()
等待它结束):static int x; Thread t1 = new Thread(() -> { x = 10; }, "t1"); t1.start(); t1.join(); System.out.println(x);
线程 t1 打断 t2(interrupt)前对变量的写,对于其他线程得知 t2 被打断后对变量的读可见(通过
t2.interrupted
或t2.isInterrupted
):static int x; public static void main(String[] args) { Thread t2 = new Thread(() -> { while (true) { if (Thread.currentThread().isInterrupted()) { System.out.println(x); break; } } }, "t2"); t2.start(); new Thread(() -> { sleep(1); x = 10; t2.interrupt(); }, "t1").start(); while (!t2.isInterrupted()) { Thread.yield(); } System.out.println(x); }
对变量默认值(0,false,null)的写,对其它线程对该变量的读可见。
具有传递性,如果
x hb->y
并且y hb->z
那么有x hb->z
,配合 volatile 的防指令重排,有下面的例子:volatile static int x; static int y; new Thread(() -> { y = 10; x = 20; }, "t1").start(); new Thread(() -> { // x=20对t2可见,同时y=10也对t2可见 System.out.println(x); }, "t2").start();
共享模型之无锁
无锁解决原子性问题
现在有一个账户类如下,其中有关于转账的一些方法:
class Account {
private Integer balance;
public synchronized Integer getBalance() {
return balance;
}
public synchronized void withdraw(Integer amount) {
balace -= amount;
}
}
可以看到,我们使用了 synchronized 对方法进行上锁,保证了转账时因为多线程导致的线程安全问题。
当然,上述问题也可以采用另一种无锁的方式来解决:
class AccountCas {
private AtomicInteger balance;
public Integer getBalance() {
return balance.get();
}
public void withdraw(Integer amount) {
while (true) {
// 获取余额的最新值
int prev = balance.get();
// 要修改的余额
int next = prev - amount;
// 真正修改
if (balance.compareAndSet(prev, next)) {
break;
}
}
// 可以简化为下面的方法
// balance.addAndGet(-1 * amount);
}
}
CAS 和 volatile
CAS
前面看到的 AtomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?
实际上,是 compareAndSet 方法在起作用:
- 在 set 前,先比较 prev 与当前值。
- 如果二者不一致,则 next 作废,返回 false 表示失效。(可能此时别的线程已经做了操作,导致获取的 prev 已经不是最新的了)
- 如果二者一致,则把 next 作为新值,返回 true 表示成功。
其实 CAS 的底层是 lock cmpxchg
指令(X86 架构),在单核 CPU 和多核 CPU 下都能够保证比较-交换的原子性。
在多核状态下,某个核执行到带 lock 的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。
CAS 锁
我们之前学过 AtomicInteger,保证操作的原子性,如果要用变量来表示锁,可以这么写:
@Slf4j
public class JUCTest {
public static void main(String[] args) throws Exception {
LockCas lock = new LockCas();
new Thread(() -> {
log.debug("begin...");
lock.lock();
try {
log.debug("working...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
log.debug("begin...");
lock.lock();
try {
log.debug("working...");
} finally {
lock.unlock();
}
}).start();
}
}
@Slf4j
class LockCas {
private AtomicInteger lock = new AtomicInteger(0);
public void lock() {
while (true) {
if (lock.compareAndSet(0, 1)) {
break;
}
}
log.debug("lock...");
}
public void unlock() {
lock.set(0);
log.debug("unlock...");
}
}
不过,因为需要让线程自旋,所以很消耗 CPU,实践中不建议自己这么写。
volatile
观察 AtomicInteger 源码,我们不难发现,其内部封装了一个实际的变量值,称为 value:
public class AtomicInteger extends Number implements java.io.Serializable {
// ...
private volatile int value;
}
可以看到,这个 value 被 volatile 关键字修饰。我们之前说过,它可以用来修饰成员变量和静态成员变量,让线程必须到主存中获取变量的值,避免线程从自己的工作缓存中查找变量的值。线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。
CAS 必须借助 volatile 才能读取到共享变量的最新值来实现比较并交换的效果。
为什么无锁效率高
- 无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而 synchronized 会让线程在没有获得锁的时 候,发生上下文切换,进入阻塞。
- 打个比喻,线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速…恢复到高速运行,代价比较大。
- 但无锁情况下,因为线程要保持运行,需要额外 CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。
总的来说,CAS 必须在多核 CPU 下才能保证优势,并且线程数不能大于 CPU 核数,否则性能并不是很好。
CAS 特点
结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
- CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
- synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
- CAS 体现的是无锁并发、无阻塞并发:
- 因为没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一。
- 但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响。
原子整数
juc 并发包提供了:
- AtomicBoolean
- AtomicInteger
- AtomicLong
以 AtomicInteger 为例:
@Slf4j
public class JUCTest {
public static void main(String[] args) throws Exception {
AtomicInteger i = new AtomicInteger(0);
System.out.println(i.getAndIncrement()); // 类似于i++
System.out.println(i.incrementAndGet()); // 类似于++i
System.out.println(i.getAndDecrement()); // 类似于i--
System.out.println(i.decrementAndGet()); // 类似于--i
System.out.println(i.getAndAdd(5)); // 获取并加值
System.out.println(i.addAndGet(-5)); // 加值后获取
System.out.println(i.updateAndGet(value -> value * 2)); // 原子性的乘法操作
}
}
原子引用
juc 并发包提供了:
- AtomicReference
- AtomicMarkableReference
- AtomicStampedReference
为什么需要原子引用呢?原因是,我们要保护的往往不是一个简单的变量,而是一个对象引用。以 AtomicReference 为例:
class DecimalAccountSafeCas {
private AtomicReference<BigDecimal> ref;
public DecimalAccountSafeCas(BigDecimal balance) {
ref = new AtomicReference<>(balance);
}
public BigDecimal getBalance() {
return ref.get();
}
public void withdraw(BigDecimal amount) {
while (true) {
BigDecimal prev = ref.get();
BigDecimal next = prev.substract(amount);
if (ref.compareAndSet(prev, next)) {
break;
}
}
}
}
ABA 问题
看如下代码:
@Slf4j
public class JUCTest {
private static AtomicReference<String> ref = new AtomicReference<>("A");
public static void main(String[] args) throws Exception {
log.debug("main start...");
// 获取现有值
String prev = ref.get();
// 中间有其他线程干扰,发生ABA现象
other();
Thread.sleep(1000);
// 尝试改为C
log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
}
public static void other() throws InterruptedException {
new Thread(() -> {
log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));
}, "t1").start();
Thread.sleep(500);
new Thread(() -> {
log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
}, "t2").start();
}
}
观察输出后发现,主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又 改回 A 的情况。
也就是说,当其他线程对共享变量进行修改的时候,只要修改前和修改后的结果一致,那么一开始的线程就无法感知到共享变量的修改。此为 ABA 问题。
大部分情况下,这种问题的发生实际上对业务不会有很大影响,旦也有可能存在隐患。如果主线程希望只要有其它线程修改过了共享变量,那么自己的 cas 就算失败。这时,仅比较值是不够的,需要再加一个版本号。
AtomicStampedReference
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次:
@Slf4j
public class JUCTest {
// 构造的时候还要添加一个版本号,初始值我们给0
private static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);
public static void main(String[] args) throws Exception {
log.debug("main start...");
// 获取现有值以及版本号
String prev = ref.getReference();
int stamp = ref.getStamp();
log.debug("{}", stamp);
// 即使中间有其他线程干扰,但是有版本号的介入,ABA问题可以杜绝
other();
Thread.sleep(1000);
// 尝试改为C,并更改后把版本号加1
log.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}
public static void other() throws InterruptedException {
new Thread(() -> {
int stamp = ref.getStamp();
log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", stamp, stamp + 1));
}, "t1").start();
Thread.sleep(500);
new Thread(() -> {
int stamp = ref.getStamp();
log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", stamp, stamp + 1));
}, "t2").start();
}
}
AtomicMarkableReference
使用 AtomicStampedReference 可以通过版本号来判断变量是否被更改过。但是有时候,我们并不关心变量被更改了几次,只是单纯关心变量是否被修改过。所以就有了 AtomicMarkableReference。
@Slf4j
public class JUCTest {
// 构造的时候加入一个标记,表示是否被修改过
private static AtomicMarkableReference<String> ref = new AtomicMarkableReference<>("A", false);
public static void main(String[] args) throws Exception {
log.debug("main start...");
// 获取现有值以及是否被修改过的标记
String prev = ref.getReference();
boolean marked = ref.isMarked();
log.debug("{}", marked);
other();
Thread.sleep(1000);
// 尝试改为C,并把修改过的标记改成true
log.debug("change A->C {}", ref.compareAndSet(prev, "C", marked, true));
}
public static void other() throws InterruptedException {
new Thread(() -> {
boolean marked = ref.isMarked();
log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B", marked, true));
}, "t1").start();
Thread.sleep(500);
new Thread(() -> {
boolean marked = ref.isMarked();
log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A", marked, true));
}, "t2").start();
}
}
原子数组
juc 并发包提供:
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
上述对象保证了数组里面的元素的操作原子性。具体 API 不再赘述。
字段更新器
juc 并发包提供:
- AtomicReferenceFieldUpdater
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
利用这些字段更新器,可以针对对象的某个字段(Field)进行原子操作,只能配合 volatile 关键字使用,否则会出现异常。
@Slf4j
public class JUCTest {
public static void main(String[] args) throws Exception {
Student student = new Student();
// 使用字段更新器在保证线程安全的情况下更新对象中的字段
AtomicReferenceFieldUpdater<Student, String> updater =
AtomicReferenceFieldUpdater
.newUpdater(Student.class, String.class, "name");
// 更新字段(对象,原始值,新值)
updater.compareAndSet(student, null, "张三");
System.out.println(student.getName());
}
}
@Getter
class Student {
// 需要使用volatile修饰字段
volatile String name;
}
原子累加器
在 java 8 之后,新增加了几个专门用来进行累加操作的类,其性能必 AtomicLong 等要高很多:
- DoubleAccumulator
- DoubleAdder
- LongAccumulator
- LongAdder
其性能提升的原理也很简单,原子累加器中会设置多个累加单元,线程 1 操作累加单元 1,线程 2 操作累加单元 2…最后再将结果汇总。这样子可以在一定程度上避免 CAS 的多次重试失败,从而提高效率。
LongAdder详解
LongAdder 是并发大师 @author Doug Lea (大哥李)的作品,设计的非常精巧。
LongAdder 类有几个关键域(transient 保证在序列化的时候不会把该字段序列化出去):
// 累加单元数组,懒惰初始化
transient volatile Cell[] cells;
// 基础值,如果没有竞争,则用cas累加这个域
transient volatile long base;
// 在cells创建或扩容时,置为1,表示加锁
transient volatile int cellsBusy;
伪共享
LongAdder 使用 Cell 来表示累加单元:
// 防止缓存行伪共享
@sum.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) {
value = x;
}
// 最重要的方法,用来cas方式进行累加,prev表示旧值,next表示新值
final boolean cas(long prev, long next) {
return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);
}
// ...
}
其中,@sum.misc.Contended
是为了防止缓存行伪共享:
- 计算机系统中,除了内存,还有缓存。因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。
- 而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)。缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中。CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效。
- 因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因此缓存行可以存下 2 个的 Cell 对象。
- 一旦同一个缓存行中存在多个 Cell 对象,就会导致多个线程修改缓存行中某个 Cell 对象的时候,导致整个缓存行失效。从而起不到缓存提速的效果。
- 而
@sum.misc.Contended
就是用来防止这种缓存行失效的现象,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的 padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行。这样,不会造成对方缓存行的失效。
Unsafe
Unsafe 对象提供了非常底层的,操作内存、线程的方法(老版本中的 Unsafe 无法直接调用,需要通过反射获得,该类涉及底层操作,不建议使用,不安全):
@Slf4j
public class JUCTest {
public static void main(String[] args) throws Exception {
Teacher teacher = new Teacher();
Unsafe unsafe = Unsafe.getUnsafe();
// 获取属性偏移量
long idOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("id"));
long nameOffset = unsafe.objectFieldOffset(Teacher.class.getDeclaredField("name"));
// 执行cas操作
unsafe.compareAndSwapInt(teacher, idOffset, 0, 1);
unsafe.compareAndSwapObject(teacher, nameOffset, null, "张三");
// 验证结果
System.out.println(teacher);
}
}
@Data
class Teacher {
volatile int id;
volatile String name;
}
共享模型之不可变
日期转换问题
下面代码在运行时,由于 SimpleDateFormat 不是线程安全的,有很大几率出现 java.lang.NumberFormatException
或者出现不正确的日期解析结果:
public static void main(String[] args) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
log.debug("{}", sdf.parse("2025-01-15"));
} catch (ParseException e) {
log.error("{}", e.getMessage());
}
}).start();
}
}
同步锁解决
直接上锁可以解决这个问题,但是性能不是很好:
public static void main(String[] args) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
for (int i = 0; i < 10; i++) {
new Thread(() -> {
synchronized (sdf) {
try {
log.debug("{}", sdf.parse("2025-01-15"));
} catch (ParseException e) {
log.error("{}", e.getMessage());
}
}
}).start();
}
}
不可变解决
如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改啊!这样的对象在 Java 中有很多,例如在 Java 8 后,提供了一个新的日期格式化类:
public static void main(String[] args) throws Exception {
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");
for (int i = 0; i < 10; i++) {
new Thread(() -> {
LocalDate date = dtf.parse("2025-01-15", LocalDate::from);
log.debug("{}", date);
}).start();
}
}
不可变设计
不可变对象,实际是另一种避免竞争的方式。
另一个大家更为熟悉的 String 类也是不可变的,以它为例,说明一下不可变设计的要素:
// 类上的final保证类不可继承,防止子类意外重写父类方法
public final class String implements java.io.Serializable, Comparable<String>, CharSequence {
// 变量上的final保证value引用无法被修改
private final char value[];
// hash值直接保存在私有变量中,没有set方法,避免hash值被更改计算
private int hash;
// ...
}
final 的使用
不可变类中所有属性都是 final 的:
- 属性用 final 修饰保证了该属性是只读的,不能修改。
- 类用 final 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性。
保护性拷贝
针对于要对字符串的修改,可以使用保护性拷贝,比如 substring 方法:
public String substring(int beginIndex, int endIndex) {
int length = length();
checkBoundsBeginEnd(beginIndex, endIndex, length);
if (beginIndex == 0 && endIndex == length) {
return this;
}
int subLen = endIndex - beginIndex;
// 返回的是新创建的字符串
return isLatin1() ? StringLatin1.newString(value, beginIndex, subLen)
: StringUTF16.newString(value, beginIndex, subLen);
}
这种通过创建副本对象来避免共享的手段称之为保护性拷贝(defensive copy)。
final 原理
- 设置 final 变量的原理:
final int a = 20;
,使用 final 赋值时,在赋值后会加入写屏障(同 volatile),保证其他线程不会读到a = 0
。 - 获取 final 变量的原理:读取时直接读对应的值,而不是去调用方法。
享元模式
从我们上述讲的内容来看,为了保证一个对象的不可变性,往往需要使用 final 修饰以及保护性拷贝,这会导致频繁地创建对象,浪费内存空间。
针对上述问题,我们往往会在设计不可变对象的时候关联一个设计模式——享元模式(Flyweight pattern,属于结构模式)。其核心思想就是:当需要创建和之前对象的值相同的对象时,可以直接复用以前的对象,而不需要重复创建。
体现
在 JDK 中 Boolean,Byte,Short,Integer,Long,Character 等包装类提供了 valueOf
方法,例如 Long 的 valueOf 会缓存 -128 ~ 127 之间的 Long 对象,在这个范围之间会重用对象,大于这个范围,才会新建 Long 对 象:
public static Long valueOf(long l) {
final int offset = 128;
if (l >= -128 && l <= 127) { // will cache
return LongCache.cache[(int)l + offset];
}
return new Long(l);
}
注意:
- Byte,Short,Long 缓存的范围都是 -128 ~ 127。
- Character 缓存的范围是 0 - 127。
- Integer 的默认范围是 -128 - 127:
- 最小值不能变。
- 但最大值可以通过调整虚拟机参数
-Djava.lang.Integer.IntegerCache.high
来改变。
- Boolean 缓存了 true 和 false。
自定义连接池
一个线上商城应用,QPS 达到数千,如果每次都重新创建和关闭数据库连接,性能会受到极大影响。 这时预先创建好一批连接,放入连接池。一次请求到达后,从连接池获取连接,使用完毕后再还回连接池,这样既节约了连接的创建和关闭时间,也实现了连接的重用,不至于让庞大的连接数压垮数据库。
接下来,我们采用享元模式,结合上述所有我们提到的有关于线程安全的知识,来自己封装一个连接池满足上述要求:
class Pool {
// 连接池大小,保证大小固定
private final int poolSize;
// 连接对象数组
private Connection[] connections;
// 连接状态数组 0-空闲 1-繁忙,使用线程安全的数组实现
private AtomicIntegerArray status;
public Pool(int poolSize) {
this.poolSize = poolSize;
this.connections = new Connection[poolSize];
this.status = new AtomicIntegerArray(poolSize);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection();
}
}
// 借连接
public Connection borrow() {
// 乐观锁从头到尾遍历一遍,保证原子性
while (true) {
for (int i = 0; i < poolSize; i++) {
if (status.get(i) == 0) {
if (status.compareAndSet(i, 0, 1)) {
return connections[i];
}
}
}
// 没有空闲连接,让当前线程进入等待,避免长时间自旋消耗CPU资源
synchronized (this) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
// Connection与连接在这里是唯一绑定的,可以不用加锁
status.set(i, 0);
// 唤醒线程,争抢空闲连接
synchronized (this) {
this.notifyAll();
}
break;
}
}
}
}
无状态
在 web 阶段学习时,设计 Servlet 时为了保证其线程安全,都会有这样的建议,不要为 Servlet 设置成员变量,这 种没有任何成员变量的类是线程安全的。因为成员变量保存的数据也可以称为状态信息,因此没有成员变量就称之为无状态。
共享模型之并发工具
自定义线程池
+-Thread Pool-+
| |
| t1 -----+-->+----Blocking Queue-----+
| | | |
| t2 -----+-->| task1 | task2 | task3 |<------- main
| | | |
| t3 -----+-->+-----------------------+
| |
+-------------+
阻塞队列
阻塞队列是平衡生产者线程和消费者线程之间的桥梁。一般主线程为生产者线程,它不断地把新任务放置到阻塞队列中,而后多个消费者线程只要处于空闲状态,就从阻塞队列里面取任务执行。
拒绝策略:
@FunctionalInterface
public interface RejectPolicy<T> {
void reject(BlockingQueue<T> queue, T task);
}
阻塞队列:
class BlockingQueue<T> {
// 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 锁
private ReentrantLock lock = new ReentrantLock();
// 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 容量
private int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
// 阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
// 队列不满,唤醒生产者线程
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 带超时的获取
public T take(long timeout, TimeUnit unit) {
lock.lock();
try {
// 1.将timeout转换成纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
// 3.超时无需等待
if (nanos <= 0) {
return null;
}
// 2.返回剩余等待时间
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
// 队列不满,唤醒生产者线程
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 阻塞添加
public void put(T element) {
lock.lock();
try {
while (queue.size() == capacity) {
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(element);
// 队列不空,唤醒消费者线程
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
// 带超时时间的阻塞添加
public boolean offer(T element, long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (queue.size() == capacity) {
try {
if (nanos <= 0) {
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(element);
// 队列不空,唤醒消费者线程
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
// 获取大小
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
// 根据不同策略来执行put操作
// 1.死等
// 2.带超时等待
// 3.让调用者放弃任务执行
// 4.让调用者抛出异常
// 5.让调用者自己执行任务
// ...
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
// 判断队列是否满
if (queue.size() == capacity) {
// 执行拒绝策略
rejectPolicy.reject(this, task);
} else { // 有空闲
queue.addLast(task);
emptyWaitSet.signal();
}
} finally {
lock.unlock();
}
}
}
线程池
public class ThreadPool {
// 任务队列
private BlockingQueue<Runnable> taskQueue;
// 线程集合
private HashSet<Worker> workers = new HashSet();
// 线程数
private int coreSize;
// 获取任务的超时时间
private long timeout;
private TimeUnit unit;
// 拒绝策略
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit unit, int queueCapacity, RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.unit = unit;
this.taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
// 线程封装
public class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 执行传入的任务和队列中的任务
while (task != null || (task = taskQueue.take()) != null) {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
task = null;
}
}
// 没任务了,移除线程
synchronized (workers) {
workers.remove(this);
}
}
}
// 执行任务
public void execute(Runnable task) {
// 当任务数没有超过核心线程数时,直接交给worker对象执行
synchronized (workers) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
} else {
// 任务数超过核心线程数,执行不同策略
taskQueue.tryPut(rejectPolicy, task);
}
}
}
}
ThreadPoolExecutor
线程池状态
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量:
状态名 | 高 3 位 | 接收新任务 | 处理阻塞队列任务 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | |
SHUTDOWN | 000 | N | Y | 不会接收新任务,但会处理阻塞队列剩余任务 |
STOP | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列任务 |
TIDYING | 010 | - | - | 任务全执行完毕,活动线程为 0 即将进入终结 |
TERMINATED | 011 | - | - | 终结状态 |
这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作 进行赋值:
// c为旧值,ctlOf返回结果为新值
ctl.compareAndSet(c, ctlOf(targetStarte, workerCountOf(c)));
// rs为高3位代表线程池状态,wc为低29位代表线程个数,ctl是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }
构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心线程数目(最多保留的线程数)。
- maximumPoolSize:最大线程数目。
- keepAliveTime:生存时间,针对救急线程。
- unit:时间单位,针对救急线程。
- workQueue:阻塞队列。
- threadFactory:线程工厂,可以为线程创建时起名字。
- handler:拒绝策略。
jdk 线程池工作方式如下:
线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
如果队列选择了有界队列,那么任务超过了队列大小时,会创建
maximumPoolSize - corePoolSize
数目的线程来救急。如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现:
- AbortPolicy:让调用者抛出 RejectedExecutionException 异常,这是默认策略。
- CallerRunsPolicy:让调用者运行任务。
- DiscardPolicy:放弃本次任务。
- DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之。
此外,一些著名的框架也有对拒绝策略做一些增强:
- Dubbo 的实现:在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题。
- Netty 的实现:是创建一个新线程来执行任务。
- ActiveMQ 的实现:带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略。
- PinPoint 的实现:它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略。
当高峰过去后,超过 corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。
工厂方法
为了防止新手使用的时候对上述的那些参数不了解,JDK Executors 类中也提供了众多工厂方法来创建各种用途的线程池。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
特点:
- 核心线程数 = 最大线程数(没有救急线程被创建),因此也无需超时时间。
- 阻塞队列是无界的,可以放任意数量的任务。
- 没有任务时核心线程不会自动结束。
该方法适用于任务量已知,相对耗时的任务。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特点:
- 核心线程数是 0, 最大线程数是
Integer.MAX_VALUE
,救急线程的空闲生存时间是 60s,意味着:- 全部都是救急线程(60s 后可以回收)。
- 救急线程可以无限创建。
- 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取结果,别的线程是放不进去的。
整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1 分钟后释放线程。适合任务数比较密集,但每个任务执行时间较短的情况。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
特点:
- 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一 个线程,保证池的正常工作。
Executors.newSingleThreadExecutor()
线程个数始终为1,不能修改。FinalizableDelegatedExecutorService
应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法。Executors.newFixedThreadPool(1)
初始时为1,以后还可以修改。对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用setCorePoolSize
等方法进行修改。
希望多个任务排队执行时可以用本方法创建线程池。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
提交任务
提交任务相关的方法有:
// 执行任务
void execute(Runnable command);
// 提交任务task,用返回值Future获得任务执行结果
<T> Future<T> submit(Callable<T> task);
// 提交tasks中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
// 提交tasks中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
// 提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
// 提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
示例:
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(2);
// 使用Callable执行需要返回值的任务
Future<String> future = pool.submit(() -> {
Thread.sleep(1000);
return "ok";
});
String result = future.get();
System.out.println(result);
}
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(2);
List<Future<String>> futures = pool.invokeAll(Arrays.asList(
() -> {
log.debug("begin1...");
Thread.sleep(1000);
return "1";
},
() -> {
log.debug("begin2...");
Thread.sleep(500);
return "2";
}
));
futures.forEach(future -> {
try {
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
停止
/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();
/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
示例:
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(2);
// 执行任务
pool.submit(() -> {
Thread.sleep(2000);
log.debug("return 1...");
return "1";
});
// 终止线程池,该代码相对于上方的线程会优先执行,但是不会终止掉线程池中正在运行的任务
log.debug("shutdown...");
pool.shutdown();
}
任务调度线程池
在任务调度线程池功能加入之前,可以使用 java.util.Timer
来实现定时功能,Timer 的优点在于简单易用,但 由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
public static void main(String[] args) throws Exception {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
log.debug("task1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
log.debug("task2");
}
};
// 使用timer添加两个任务,希望它们都在1s后执行
// 但由于timer内只有一个线程来顺序执行队列中的任务,因此task1的延时影响了task2的延时
timer.schedule(task1, 1000);
timer.schedule(task2, 1000);
}
可以使用 ScheduledThreadPool 来改进:
public static void main(String[] args) throws Exception {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
// 执行延时任务
pool.schedule(() -> log.debug("task1"), 1, TimeUnit.SECONDS);
pool.schedule(() -> log.debug("task2"), 1, TimeUnit.SECONDS);
// 定时循环执行,主线程启动后1s开始打印task3,之后每5s打印一次,如果任务周期长于5s,则按照任务周期来
pool.scheduleAtFixedRate(() -> log.debug("task3"), 1, 5, TimeUnit.SECONDS);
// 定时循环执行,等待任务完全执行结束后,再等待5s开始下一个task4任务
pool.scheduleWithFixedDelay(() -> log.debug("task4"), 1, 5, TimeUnit.SECONDS);
}
正确处理线程池异常
对于线程池中任务的异常处理,有两种解决方式:
自己处理:
ExecutorService pool = Executors.newFixedThreadPool(1); pool.submit(() -> { try { log.debug("task..."); int i = 1 / 0; } catch (Exception e) { e.printStackTrace(); } });
使用 Future:
ExecutorService pool = Executors.newFixedThreadPool(1); Future<Boolean> future = pool.submit(() -> { log.debug("task..."); int i = 1 / 0; return true; }); log.debug("result: {}", future.get());
异步模式——工作线程
定义
让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现 就是线程池,也体现了经典设计模式中的享元模式。
例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那 么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message)。
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率。例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率底下。分成服务员(线程池A)与厨师(线程池B)更为合理。
饥饿
固定大小线程池会有饥饿现象:
- 两个工人是同一个线程池中的两个线程。
- 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作:
- 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待。
- 后厨做菜:没啥说的,做就是了。
- 比如工人 A 处理了点餐任务,接下来它要等着工人 B 把菜做好,最后上菜。
- 但现在同时来了两个客人,这个时候工人 A 和工人 B 都去处理点餐了,这时没人做饭了,饥饿。
@Slf4j
public class JUCTest {
private static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
private static Random RANDOM = new Random();
private static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) throws Exception {
// 没有救急线程
ExecutorService pool = Executors.newFixedThreadPool(2);
// 连续调用两次的时候,两个线程会都卡在处理点餐这一步
order(pool);
order(pool);
}
private static void order(ExecutorService pool) {
pool.execute(() -> {
log.debug("处理点餐...");
Future<String> future = pool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
}
解决方法可以增加线程池的大小,不过不是根本解决方案,还是前面提到的,不同的任务类型,采用不同的线程 池,例如:
@Slf4j
public class JUCTest {
private static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
private static Random RANDOM = new Random();
private static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) throws Exception {
// 没有救急线程,但是分成两个线程池来分别执行不同任务
ExecutorService waiterPool = Executors.newFixedThreadPool(2);
ExecutorService cookPool = Executors.newFixedThreadPool(2);
order(waiterPool, cookPool);
order(waiterPool, cookPool);
}
private static void order(ExecutorService waiterPool, ExecutorService cookPool) {
waiterPool.execute(() -> {
log.debug("处理点餐...");
Future<String> future = cookPool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
}
创建多大线程池合适
线程池的大小创建也有讲究:
- 过小会导致程序不能充分地利用系统资源、容易导致饥饿。
- 过大会导致更多的线程上下文切换,占用更多内存。
CPU 密集型运算
通常采用 cpu 核数 + 1
能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费。
IO 密集型运算
CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程 RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。
经验公式如下:
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间
例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式:
4 * 100% * 100% / 50% = 8
例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式:
4 * 100% * 100% / 10% = 40
Tomcat 线程池
Tomcat 线程池工作流程如下:
flowchart LR LimitLatch --> Acceptor Acceptor --> SocketChannel1 --有读--> Poller Acceptor --> SocketChannel2 --有读--> Poller Poller --socketProcessor--> worker1 Poller --socketProcessor--> worker2 subgraph ide2 [Executor] worker1 worker2 end
- LimitLatch 用来限流,可以控制最大连接个数,类似 juc 中的 Semaphore (详见后续章节)。
- Acceptor 只负责接收新的 socket 连接。
- Poller 只负责监听 socket channel 是否有可读的 I/O 事件。
- 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理。
- Executor 线程池中的工作线程最终负责处理请求。
Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同:
- 如果总线程数达到 maximumPoolSize:
- 这时不会立刻抛 RejectedExecutionException 异常。
- 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常。
Fork / Join
Fork / Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算。
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解。
Fork / Join 在分治的基础上加入了多线程,默认会创建与 cpu 核心数大小相同的线程池,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率。
使用
提交给 Fork / Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下 面定义了一个对 1~n 之间的整数求和的任务:
@Slf4j
public class JUCTest {
public static void main(String[] args) throws Exception {
ForkJoinPool pool = new ForkJoinPool(4);
// MyTask(5) = 5 + MyTask(4) ...
System.out.println(pool.invoke(new MyTask(5)));
}
}
// 求1-n之间整数的和
class MyTask extends RecursiveTask<Integer> {
private int n;
public MyTask(int n) {
this.n = n;
}
@Override
protected Integer compute() {
// 终止条件
if (n == 1) {
return 1;
}
// 任务拆分 MyTask(n) = n + MyTask(n-1) ...
MyTask t1 = new MyTask(n - 1);
t1.fork(); // 让一个线程去执行此任务
Integer join = t1.join();// 获取任务结果
return n + join;
}
}
当然,我们可以对上述代码进行改进,让任务与任务之间的依赖关系减弱,进一步加强多核优势:
@Slf4j
public class JUCTest {
public static void main(String[] args) throws Exception {
ForkJoinPool pool = new ForkJoinPool(4);
System.out.println(pool.invoke(new MyTask(1, 5)));
}
}
// 求1-n之间整数的和
class MyTask extends RecursiveTask<Integer> {
private int begin;
private int end;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
// 终止条件
if (begin == end) {
return begin;
}
if (end - begin == 1) {
return end + begin;
}
// 二分思想
int middle = (begin + end) / 2;
MyTask left = new MyTask(begin, middle);
MyTask right = new MyTask(middle + 1, end);
left.fork();
right.fork();
return left.join() + right.join();
}
}
AQS 原理
全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架。
特点:
- 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取 锁和释放锁:
- getState:获取 state 状态。
- setState:设置 state 状态。
- compareAndSetState:cas 机制设置 state 状态。
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源。
- 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList。
- 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet。
子类主要实现这样一些方法(默认抛出 UnsupportedOperationException):
- tryAcquire。
- tryRelease。
- tryAcquireShared。
- tryReleaseShared。
- isHeldExclusively。
获取锁:
// 如果获取锁失败
if (!tryAcquire(arg)) {
// 入队,可以选择阻塞当前线程
}
释放锁:
// 如果释放锁成功
if (tryRelease(arg)) {
// 让阻塞线程恢复运行
}
原理
早期程序员会自己通过一种同步器去实现另一种相近的同步器,例如用可重入锁去实现信号量,或反之。这显然不 够优雅,于是在 JSR166(java 规范提案)中创建了 AQS,提供了这种通用的同步器机制。
AQS 要实现的功能目标:
- 阻塞版本获取锁 acquire 和非阻塞的版本尝试获取锁 tryAcquire。
- 获取锁超时机制。
- 通过打断取消机制。
- 独占机制及共享机制。
- 条件不满足时的等待机制。
AQS 的基本思想其实很简单:
// 获取锁逻辑
while (state状态不允许获取) {
if (队列中还没有此线程) {
入队并阻塞
}
}
当前线程出队
// 释放锁逻辑
if (state状态允许了) {
恢复阻塞的线程(s)
}
/*
要点:
- 原子维护 state 状态
- 阻塞及恢复线程
- 维护队列
*/
其中:
- state 设计:
- state 使用 volatile 配合 cas 保证其修改时的原子性。
- state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想。
- 阻塞恢复设计:
- 早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume 那么 suspend 将感知不到。
- 解决方法是使用 park & unpark 来实现线程的暂停和恢复,具体原理在之前讲过了,先 unpark 再 park 也没问题。
- park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细。
- park 线程还可以通过 interrupt 打断。
- 队列设计:
- 使用了 FIFO 先入先出队列,并不支持优先级队列。
- 设计时借鉴了 CLH 队列,它是一种单向无锁队列(实际上是一个双向链表,但是源代码只使用了其单向的功能,无锁,使用自旋,快速无阻塞)。
主要用到 AQS 的并发工具类有:ReentrantReadWriteLock、ReentrantLock、Semaphore。
使用
我们使用 AQS 实现不可重入锁:
@Slf4j
public class JUCTest {
public static void main(String[] args) throws Exception {
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
} finally {
log.debug("unlocking...");
lock.unlock();
}
}, "t1").start();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
} finally {
log.debug("unlocking...");
lock.unlock();
}
}, "t2").start();
}
}
// 自定义锁(不可重入锁)
class MyLock implements Lock {
// 同步器类,独占锁
class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
// 尝试把state从0改成1,如果成功,表示加锁成功
if (compareAndSetState(0, 1)) {
// 加锁成功,设置owner线程为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
// state由volatile保护,把state的赋值放在下面,保证上面的指令不会被重排
setState(0);
return true;
}
@Override // 是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition() {
return new ConditionObject();
}
}
private MySync sync = new MySync();
@Override // 加锁(不成功会进入等待队列等待)
public void lock() {
sync.acquire(1); // 内部会调用tryAcquire,不断重试加锁
}
@Override // 加锁,可打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override // 尝试加锁,只试一次
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override // 尝试加锁,带超时时间
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override // 释放锁
public void unlock() {
sync.release(1);
}
@Override // 创建条件变量
public Condition newCondition() {
return sync.newCondition();
}
}
ReentrantLock
非公平锁实现原理
ReentrantLock 的构造器默认为非公平锁实现:
public ReentrantLock() {
sync = new NonfairSync();
}
非公平锁的加锁解锁流程如下,假设现在有一个线程 0 已经成功占用了锁,此时:
- state 会被置为 1。
- exclusiveOwnerThread 会被 t0 占用。
接下来来了一个线程 t1,尝试加锁:
- CAS 尝试将 state 由 0 改为 1,结果失败。
- 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败。
- 接下来进入 addWaiter 逻辑,构造 Node 队列:
- 每一个 Node 维护一个线程以及一个标记位,如果标记位为 0 表示线程正常,如果标记位为 -1 表示该节点有职责唤醒其后面第一个阻塞的线程。
- 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程。
而后,当前线程进入 acquireQueued 逻辑:
- acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞。
- 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败。
- 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false。
- shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败。
- 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true。
- 进入 parkAndCheckInterrupt,t1 park。
如果此时 t0 释放锁,进入 tryRelease 流程,如果成功:
- 设置 exclusiveOwnerThread 为 null。
- state = 0。
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程(唤醒阻塞线程):找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1。回到 Thread-1 的 acquireQueued 流程。
如果加锁成功(没有竞争),会设置:
- exclusiveOwnerThread 为 Thread-1,state = 1。
- head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread。
- 原本的 head 因为从链表断开,而可被垃圾回收。
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了。如果不巧又被 Thread-4 占了先:
- Thread-4 被设置为 exclusiveOwnerThread,state = 1。
- Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞。
可重入原理
在上锁的时候,判断 exclusiveOwnerThread 是否为当前线程,如果是,则允许锁重入。会将 state 的值进行递增。释放锁的时候会将 state 的值递减,直到 0 为止,此时意为成功释放锁。
可打断原理
在不可打断模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了。在可打断模式下,被打断会直接抛出异常,退出循环。
公平锁原理
当别的线程进来的时候,先检查 AQS 队列中是否有前驱节点, 没有才去竞争。
条件变量
每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject。
await 流程:
开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程。创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部。
接下来进入 AQS 的 fullyRelease 流程(重入锁时 state 不一定只为 1),释放同步器上的锁。
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功。
signal 流程:
- 假设 Thread-1 要来唤醒 Thread-0。进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node。
- 执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0。
- Thread-1 释放锁,进入 unlock 流程,略。
ReentrantReadWriteLock
当读操作远远高于写操作时,这时候使用读写锁让读-读可以并发,提高性能。
读写锁应用
提供一个数据容器类,内部分别使用读锁保护数据的 read 方法,写锁保护数据的 write 方法:
@Slf4j
public class JUCTest {
public static void main(String[] args) throws Exception {
DataContainer container = new DataContainer();
writeAndRead(container);
}
// 读-读,不互斥
private static void readAndRead(DataContainer container) {
new Thread(container::read, "t1").start();
new Thread(container::read, "t2").start();
}
// 读-写,需要等待读完,才能写
private static void writeAndRead(DataContainer container) {
new Thread(container::read, "t1").start();
new Thread(container::write, "t2").start();
}
// 写-写,互斥
private static void writeAndWrite(DataContainer container) {
new Thread(container::write, "t1").start();
new Thread(container::write, "t2").start();
}
}
@Slf4j
class DataContainer {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock(); // 读锁
private ReentrantReadWriteLock.WriteLock w = rw.writeLock(); // 写锁
public Object read() {
r.lock();
try {
log.debug("读取");
Thread.sleep(1000);
return data;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
log.debug("释放读锁");
r.unlock();
}
}
public void write() {
w.lock();
try {
log.debug("写入");
} finally {
log.debug("释放写锁");
w.unlock();
}
}
}
注意事项:
- 读锁不支持条件变量。
- 重入时升级不支持:即持有读锁的情况下获取写锁,会导致获取写锁永久等待。
- 重入时降级支持:即持有写锁的情况下去获取读锁。
读写锁原理
读写锁用的是同一个 Sync 同步器,因此等待队列、state 也是同一个。不同的是写锁状态占了 state 的低 16 位,读锁使用的是 state 的高 16 位。
读锁在上锁的时候,因为有多个线程可并发,所以对于 state 高 16 位的修改会不断递增。同理,释放锁的时候会检查 state 高 16 位是否已经减为 0,如果不是,则表明还有线程在读。
StampedLock
该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合戳使用:
加解读锁:
long stamp = lock.readLock();
lock.unlockRead(stamp);
加解写锁:
long stamp = lock.writeLock();
lock.unlockWrite(stamp);
乐观读,StampedLock 支持 tryOptimisticRead()
方法(乐观读),读取完毕后需要做一次戳校验。如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
long stamp = lock.tryOptimisticRead();
// 验戳
if (!lock.validate(stamp)) {
// 锁升级
}
不过,StampedLock 不支持条件变量,也不可重入。
Semaphore
Semaphore,信号量,用来限制能同时访问共享资源的线程上限。(ReentrantLock 本质上还是独占,同一时间只能有一个线程来访问共享资源)
public static void main(String[] args) throws Exception {
// 创建Semaphore对象
Semaphore semaphore = new Semaphore(3); // 表示信号量最多访问3次
// 10个线程,当3个线程全部抢到信号量后,其他线程无法工作
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
// 获得信号量
semaphore.acquire();
log.debug("running...");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放信号量
semaphore.release();
}
log.debug("end");
}).start();
}
}
应用
- 使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机 (非分布式)线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)。
- 用 Semaphore 实现简单连接池,对比享元模式下的实现(用wait notify),性能和可读性显然更好,注意下面的实现中线程数和数据库连接数是相等的。
class Pool {
// 连接池大小,保证大小固定
private final int poolSize;
// 连接对象数组
private Connection[] connections;
// 连接状态数组 0-空闲 1-繁忙,使用线程安全的数组实现
private AtomicIntegerArray status;
// 信号量准备
private Semaphore semaphore;
public Pool(int poolSize) {
this.poolSize = poolSize;
this.semaphore = new Semaphore(poolSize);
this.connections = new Connection[poolSize];
this.status = new AtomicIntegerArray(poolSize);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection();
}
}
// 借连接
public Connection borrow() {
try {
// 获取许可,往下方执行,可以获取连接
// 没有许可的线程就会在此等待
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < poolSize; i++) {
if (status.get(i) == 0) {
if (status.compareAndSet(i, 0, 1)) {
return connections[i];
}
}
}
// 不会执行到这里,会在上方返回
return null;
}
// 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
// Connection与连接在这里是唯一绑定的,可以不用加锁
status.set(i, 0);
semaphore.release(); // 释放信号量
break;
}
}
}
}
原理
初始时 state 为构造方法设定好的值,一旦有线程抢到信号量,则对 state 递减,直到为 0,表示信号量用完,无法再继续使用。
CountdownLatch
用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await()
用来等待计数归零,countDown()
用来让计数减一。(用 join 其实也可以实现,但是 join 偏底层,用起来不太方便)
public static void main(String[] args) throws Exception {
CountDownLatch latch = new CountDownLatch(3);
new Thread(() -> {
log.debug("begin...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 计数减一
latch.countDown();
}).start();
new Thread(() -> {
log.debug("begin...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 计数减一
latch.countDown();
}).start();
new Thread(() -> {
log.debug("begin...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 计数减一
latch.countDown();
}).start();
// 主线程等待,等计数为0就恢复运行
latch.await();
log.debug("end...");
}
结合线程池使用:
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(4);
CountDownLatch latch = new CountDownLatch(3);
// 前三个线程执行任务,最后一个线程等待,后汇总
pool.submit(() -> {
log.debug("begin...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 计数减一
latch.countDown();
});
pool.submit(() -> {
log.debug("begin...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 计数减一
latch.countDown();
});
pool.submit(() -> {
log.debug("begin...");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 计数减一
latch.countDown();
});
pool.submit(() -> {
log.debug("begin...");
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("end...");
});
}
要注意的是:CountdownLatch 的计数不能被增加,也就是说,一旦创建出来一个 latch,就只能让其减到 0,没办法进行对象重用。
如果需要重用,需要使用接下来要讲的 CyclicBarrier,循环栅栏。
CyclicBarrier
循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置计数个数,每个线程执行到某个需要 “同步” 的时刻调用 await()
方法进行等待,当等待的线程数满足计数个数时,继续执行:
public static void main(String[] args) throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(2);
// 循环两遍,当计数变为0后,再调用await,会恢复成初值,可重用
CyclicBarrier barrier = new CyclicBarrier(
2,
() -> { // 这个任务是当前两个任务执行完成之后才执行的
log.debug("task1 task2 finish...");
}
);
pool.submit(() -> {
log.debug("task1 begin...");
try {
Thread.sleep(500);
barrier.await(); // 2 - 1 = 1 != 0,阻塞
log.debug("task1 end...");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
pool.submit(() -> {
log.debug("task2 begin...");
try {
Thread.sleep(1000);
barrier.await(); // 1 - 1 = 0 == 0,两个线程继续运行
log.debug("task2 end...");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
pool.shutdown();
}
线程安全集合类
分类
线程安全集合类可以分为三大类:
- 遗留的线程安全集合:Hashtable、Vector。它们的方法直接用 synchronizd 修饰,保证线程安全,但是性能较差。
- 使用 Collections 修饰的集合,如:synchronizdCollection、synchronizdList、synchronizdMap、synchronizdSet…用于将不安全的集合包装成线程安全的。装饰器模式,本质上还是用了 synchronizd。
- juc 包提供的集合类。可以发现它们有规律,里面包含三类关键词: Blocking、CopyOnWrite、Concurrent:
- Blocking 大部分实现基于锁,并提供用来阻塞的方法。
- CopyOnWrite 之类容器修改开销相对较重。
- Concurrent 类型的容器:内部很多操作使用 cas 优化,一般可以提供较高吞吐量。遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的。求大小弱一致性,size 操作未必是 100% 准确。读取弱一致性。
ConcurrentHashMap
HashMap 并发死链
HashMap 并发死链问题产生于 jdk 8 之前。在 jdk 8 之前,对于哈希冲突而在一个哈希桶里产生的链表,如果有新元素加入,采用的是头插法。假设现在有两个线程都对同一个 HashMap 进行增元素的操作,而这增加的元素刚好触发了 HashMap 的扩容机制,在这种情况下,两个线程均会调用 HashMap 的扩容方法。假设第一个线程马上执行好了扩容。第二个线程还在执行扩容程序的时候,其中的引用的内容就会被第一个线程修改,进而导致当第二个线程进行元素重排时,有可能重排出一个循环链表,进而导致遍历链表时进入循环,引发死链问题。
究其原因,是因为在多线程环境下使用了非线程安全的 map 集合。JDK 8 虽然将扩容算法做了调整,不再将元素加入链表头(而是保持与扩容前一样的顺序),但仍不意味着能够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据)。
原理
重要字段:
// 默认为 0
// 当初始化时, 为 -1
// 当扩容时, 为 -(1 + 扩容线程数)
// 当初始化或扩容完成后,为 下一次的扩容的阈值大小
private transient volatile int sizeCtl;
// 整个 ConcurrentHashMap 就是一个 Node[]
static class Node<K,V> implements Map.Entry<K,V> {}
// hash表
transient volatile Node<K,V>[] table;
// 扩容时的新hash表
private transient volatile Node<K,V>[] nextTable;
// 扩容时如果某个 bin 迁移完毕, 用 ForwardingNode 作为旧 table bin 的头结点
static final class ForwardingNode<K,V> extends Node<K,V> {}
// 用在 compute 以及 computeIfAbsent 时, 用来占位, 计算完成后替换为普通 Node
static final class ReservationNode<K,V> extends Node<K,V> {}
// 作为 treebin 的头节点, 存储 root 和 first
static final class TreeBin<K,V> extends Node<K,V> {}
// 作为 treebin 的节点, 存储 parent, left, right
static final class TreeNode<K,V> extends Node<K,V> {}
重要方法:
// 获取 Node[] 中第 i 个 Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i)
// cas 修改 Node[] 中第 i 个 Node 的值, c 为旧值, v 为新值
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)
// 直接修改 Node[] 中第 i 个 Node 的值, v 为新值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v)
构造器分析:
// 参数:初始容量,扩容因子(3/4,表示元素占总容量3/4时扩容),并发度
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
// 这里只计算了大小,但没有创建真正的table,是懒惰初始化
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// tableSizeFor保证计算出来的大小是2^n,即16、32、64...
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
get 流程:
// 这里的get方法并没有用锁
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// spread方法能确保返回结果是正数
int h = spread(key.hashCode());
// table已创建并且里面有元素才开始找
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果头节点已经是要查找的key
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// hash为负数表示该bin在扩容中或是treebin,这时调用find方法来查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 正常遍历链表,直到找到对应的key
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
put 流程(以下数组简称 table,链表简称 bin):
- put 的时候才会创建 table(懒惰式创建,在实际创建的时候,调用 cas 把 sizeCtl 改为 -1,如果修改不成功,表示有其他线程正在创建,则当前线程 yield,直至 table 创建完毕。),使用 cas,无需 synchronized。
- 如果没有哈希冲突,则创建链表头。添加链表头使用了 cas,无需 synchronized。
- 如果碰到这个 map 正在扩容(判断出当前节点是一个 ForwardingNode),则当前线程会帮忙扩容。
- 如果碰到哈希冲突,对当前链表的头节点加 synchronized 锁。针对于链表,尾插法。如果是红黑树,调用红黑树的添加方法。
- 添加完节点后,如果此时 map 桶长度达到 64 并且链表长度大于 8,这个时候才会把链表转化成红黑树。
size 计算流程:
- size 的计算实际发生在 put,remove 改变集合元素的操作之中。
- 如果没有竞争发生,向 baseCount 累加计数。
- 如果有竞争发生,新建 counterCells(初始有两个 cell),向其中一个 cell 累加计数。如果竞争比较激烈,会创建新的 cell 来累加计数。
- 注意:因为是多个线程来访问这个 size 的计算,所以,实际上的 size 计算在多线程环境下是有误差的。
LinkedBlockingQueue
原理
基本的入队出队:
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> {
E item;
/**
* 下列三种情况之一
* - 真正的后继节点
* - 自己, 发生在出队时
* - null, 表示是没有后继节点, 是最后了
*/
Node<E> next;
Node(E x) { item = x; }
}
}
入队:队列中维护一个 Dummy 节点(类似于头节点的作用),一个头指针,一个尾指针。入队就是在两个指针之间把节点插入。
出队:
Node<E> h = head;
Node<E> first = h.next;
// 让Dummy节点自指,出队,顺便可以被垃圾回收
h.next = h; // help GC
head = first;
// 把Dummy后的第一个节点的值返回,然后置空,这个节点就变成了新的Dummy节点
E x = first.item;
first.item = null;
return x;
加锁分析:
LinkedBlockingQueue 加了两把锁:
- 用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行。用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行。消费者与消费者仍然串行,生产者与生产者仍然串行。
- 当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全,takeLock 保证的是 head 节点的线程安全。两把锁保证了入队和出队没有竞争。
- 当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争。
- 当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞。
ArrayBlockingQueue 对比:
- Linked 支持有界,Array 强制有界。
- Linked 实现是链表,Array 实现是数组。
- Linked 是懒惰的,而 Array 需要提前初始化 Node 数组。
- Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的。
- Linked 两把锁,Array 一把锁。
ConcurrentLinkedQueue
ConcurrentLinkedQueue 的设计与 LinkedBlockingQueue 非常像:
- 也是两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行。
- dummy 节点的引入让两把锁将来锁住的是不同对象,避免竞争。
- 只是这锁使用了 cas 来实现。
事实上,ConcurrentLinkedQueue 应用还是非常广泛的。例如之前讲的 Tomcat 的 Connector 结构时,Acceptor 作为生产者向 Poller 消费者传递事件信息时,正是采用了 ConcurrentLinkedQueue 将 SocketChannel 给 Poller 使用。
CopyAndWriteArrayList
CopeAndWriteArraySet 是它的马甲。底层实现采用了写入时拷贝的思想。增删改操作会将底层数组拷贝一份,更 改操作在新数组上执行,这时不影响其它线程的并发读,读写分离。 适合读多写少的场景。
这样一来,就会有弱一致性问题。可能读到旧数据。不过,一致性和并发性生来矛盾,二者需要根据业务场景进行权衡。