一、线程与进程的区别
- 进程是系统进行资源分配的最小单元,一个进程由多个线程组成
- 线程是cpu调度执行的最小单元,本身不拥有资源(共享所在进程的资源)
二、并发与并行
并行:在同一时刻同时运行多个程序,强调同时,多出现在多核情况下
并发:多个程序顺序交替进行,同一时刻只有一个程序在运行,单核下都是并发
因此,并不能说多线程就一定是并发,在多核环境下也会存在并行的情况,只不过cpu数量是有限的,大多数还是并发的情况,因此多线程主要是研究并发的情况。
三、多线程的作用
1)发挥多核cpu的优势,能够同时处理多个任务,充分利用了系统资源
2)将复杂的程序拆分成多块,把互不依赖的部分派给多个线程去执行,达到异步的目的,减少程序的阻塞,提高程序运行的效率
四、创建多线程的方法
Java中多线程运行原理
1、 第一种线程的开启方式: Thread
- 创建类, 继承Thread
- 重写run()方法
- 将要执行的代码, 放到run方法中
- 创建Thread子类的对象
-
调用start()方法开启线程
public class Demo01_开启多线程的第一种方式 { public static void main(String[] args) { // 开启线程 // 4. 创建Thread子类的对象 MyThread mt = new MyThread(); // 5. 调用start()方法开启线程 mt.start(); for (int i = 0; i < 1000; i++) { System.out.println("BB"); } } } /我是在同一个java文件中创建的 // 1. 创建一个类, 继承Thread class MyThread extends Thread { // 2. 重写run()方法 @Override public void run() { // 3. 将要执行的代码, 放到run方法中 for (int i = 0; i < 1000; i++) { System.out.println("aaaaaaaaaaaaaa"); } } }
2. 第二种线程的开启方式: Runnable
- 创建类, 实现Runnable接口
- 重写run()方法
- 将要执行的代码放到run()方法中
- 创建Runnable接口的实现类对象
- 创建Thread类的对象, 将Runnable接口的实现类对象放到构造方法参数中
-
调用Thread的start()方法开启线程
public class Demo02_开启多线程的第二种方式 { public static void main(String[] args) { // 4. 创建Runnable接口的实现类对象 MyRunnable mr = new MyRunnable(); // 5. 创建Thread类的对象, 将Runnable接口的实现类 对象放到构造方法参数中 Thread t = new Thread(mr); // 6. 调用Thread的start()方法开启线程 t.start(); for (int i = 0; i < 1000; i++) { System.out.println("BB"); } } } // 1. 创建类, 实现Runnable接口 class MyRunnable implements Runnable { // 2. 重写run()方法 @Override public void run() { // 3. 将要执行的代码放到run()方法中 for (int i = 0; i < 1000; i++) { System.out.println("aaaaaaaaaaaaaa"); } } }
3.同一个线程类开启多条线程
(1) Thread
public class Demo03_同一个线程类开多条线程_Thread {
public static void main(String[] args) {
// 4. 创建Thread子类的对象
MyThread mt1 = new MyThread();
// 5. 调用start方法开启线程
mt1.start();
MyThread mt2 = new MyThread();
mt2.start();
MyThread mt3 = new MyThread();
mt3.start();
}
}
// 1. 创建类, 继承Thread
class MyThread extends Thread {
// 2. 重写run方法
@Override
public void run() {
// 3. 将要执行的代码放到run方法中
for (int i = 1; i <= 10; i++) {
System.out.println(i);
}
}
}
(2) Runnable
public class Demo04_同一个线程类开启多条线程_Runnable {
public static void main(String[] args) {
// Runnable接口的实现类对象, 只需要创建一次
MyRunnable mr = new MyRunnable();
Thread t1 = new Thread(mr);
t1.start();
Thread t2 = new Thread(mr);
t2.start();
Thread t3 = new Thread(mr);
t3.start();
}
}
class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 1; i <= 10; i++) {
System.out.println(i);
}
}
}
4.使用Callable和Future创建线程
1】创建Callable接口的实现类,并实现call()方法,然后创建该实现类的实例
2】使用FutureTask类来包装Callable对象,该FutureTask对象封装了Callable对象的call()方法的返回值
3】使用FutureTask对象作为Thread对象的target创建并启动线程(因为FutureTask实现了Runnable接口)
4】调用FutureTask对象的get()方法来获得子线程执行结束后的返回值
public class TestCall {
public static void main(String[] args) {
//创建线程
MyCall myCall = new MyCall();
//创建future对象,并传入线程对象
FutureTask<Integer> futureTask = new FutureTask<Integer>(myCall);
//启动线程
new Thread(futureTask).start();
try {
//阻塞等待线程执行完毕获得返回值
System.out.println(futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
class MyCall implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return 5;
}
}
需要注意的是当我们调用futureTask.get()获取线程的返回值时,会导致阻塞,需要等待线程执行完,只有拿到返回值后futureTask.get()方法才会继续向下执行。
4.方法的对比
由于Runnable和Callable的方式基本相似,因此可以把这两种方式归为一 种,与继承Thread类的方法相比优势如下:
1、线程只是实现Runnable或实现Callable接口,还可以继承其他类。
2、这种方式下,多个线程可以共享一个target对象,非常适合多线程处理同一份资源的情形。
3、但是编程稍微复杂,如果需要访问当前线程,必须调用Thread.currentThread()方法。
注:一般推荐采用实现接口的方式来创建多线程
五、多线程的安全问题
1、产生线程安全的原因
- 多线程环境下拥有共享数据
-
多条语句操作共享数据
public class Happy12306 { public static void main(String[] args) { Runnable web12306 = new Web12306("12306", 50); new Passenger(web12306, "张三").start(); new Passenger(web12306, "李四").start(); new Passenger(web12306, "王五").start(); new Passenger(web12306, "赵六").start(); } } /** * 乘客 */ class Passenger extends Thread { public Passenger(Runnable target, String name) { super(target, name); } } /** * 车票网站 */ lass Web12306 implements Runnable { private String name; private int available; public Web12306(String name, int available) { this.name = name; this.available = available; } @Override public void run() { while (true) { try { Thread.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } //进行抢票 if (available > 0) { System.out.println(Thread.currentThread().getName() + "买到第:" + available + "张票"); available -= 1; } else { break; } } }
每个线程中有三条语句操作共享数据,因此导致了多个人买到同一张票的情况,那么我们应该如何处理线程安全的问题呢?
2、解决线程安全问题的四种方法
在解决线程安全问题之前,我们先来说一下并发操作的三个性质:
原子性:即不可被打断的一个整体
可见性:大家都知道,计算机在执行程序时,每条指令都是在CPU中执行的,程序运行过程中的临时数据是存放在主存(物理内存)当中的,这时就存在一个问题,由于CPU执行速度很快,而从内存读取数据和向内存写入数据的过程跟CPU执行指令的速度比起来要慢的多,因此如果任何时候对数据的操作都要通过和内存的交互来进行,会大大降低指令执行的速度。因此在CPU里面就有了高速缓存。可见性就是保证内存中的数据与缓存中的数据一致。
int i = 0;
//线程1执行的代码
i = i + 10;
//线程2执行的代码
j = i;
假若执行线程1的是CPU1,执行线程2的是CPU2。由上面的分析可知,当线程1执行 i =i+10这句时,会先把i的初始值加载到CPU1的高速缓存中,然后计算为10,那么在CPU1的高速缓存当中i的值变为10了,却没有立即写入到主存当中。
此时线程2执行 j = i,它会先去主存读取i的值并加载到CPU2的缓存当中,注意此时内存当中i的值还是0,那么就会使得j的值为0,而不是10.
这就是可见性问题,线程1对变量i修改了之后,线程2没有立即看到线程1修改的值。
那么保证可见性就是一个线程修改了这个变量的值,其他线程能够立即看得到修改后的值。
顺序性:虚拟机底层拥有happen-before模型,会对java代码进行优化,对不产生相互依赖的代码进行指令重排序
int i = 0;
boolean flag = false;
i = 1; //语句1
flag = true; //语句2
此时我们就不能保证语句1一定在语句2之前执行。而指令重排序也有可能导致线程不安全的问题。
1)同步代码块
方法:将所有对共享数据进行操作的代码放在同步代码块中
synchronized (Web12306.class) {
//进行抢票
if (available > 0) {
System.out.println(Thread.currentThread().getName() + "买到第:" + available + "张票");
available -= 1;
} else {
break;
}
}
将三条操作共享数据的代码放在同步带吗块中,就能保证多线程的安全。
synchronized的作用
synchronized可以保证并发操作的三性:可见性,原子性和顺序性。
显而易见,同步锁保证了代码块的原子性
而对于可见性,jvm对synchronized有如下两条规定:
1)线程解锁前,必须把共享变量的最新值刷新到主内存中
2)线程加锁时,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新获取最新的值
这样其他抢到锁的线程读取到的就是最新的值,这样就保证了可见性。
而对于顺序性,虽然synchronized并不能禁止jvm对代码的优化,但是由于重排序都是在同一个线程内进行的,因此对于同步代码块来说,对外表现出了顺序性
简单解析锁对象的作用
任何对象都有一个关联的monitor(监视器)对象,synchronize的加锁和解锁过程是借助monitor对象来实现,在同步代码块开始和结束的地方会插入monitorenter和monitorexit指令控制锁的获得和释放,同步代码块中的这一段代码可以称之为监视器区域,监视器保证同一时刻只有一个线程能够访问监视器区域。当线程执行完同步代码块,执行了monitorexit指令之后,监视器区域就没有线程了,这个时候其他线程就能访问这个区域。
2)同步方法
同步方法分为静态同步方法和非静态同步方法
非静态同步方法, 锁对象是this
静态的同步方法, 锁对象是当前类的字节码文件对象
3)Lock锁
首先需要创建一个锁对象,一般创建Lock的实现类ReentrantLock,即可重入锁
然后在同步代码块的前面调用锁对象的方法获取锁
获得锁的四种方法:
- lock():获得锁,如果锁被占用则等待
- tryLock():尝试获得锁,如果成功获得则返回true,获取失败则返回false,并不会导致阻塞
- tryLock(long time, TimeUnit unit):在一定时间内尝试获得锁,如果超过时限还没得到锁,则返回false
- lockInterruptibly():获取不到锁会进入等待,但是等待中的线程能够响应中断信号,比如thread2在等待中,如果其他线程调用了thread2.interrupt()则会中断thread2的等待
public class TestTryLock implements Runnable { private Lock lock = new ReentrantLock();
//需要参与同步的方法
@Override
public void run() {
/*
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "获得了锁");
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放了锁");
lock.unlock();
}
*/
if (lock.tryLock()) {
try {
System.out.println(Thread.currentThread().getName() + "获得了锁");
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "释放了锁");
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + "有人占着锁,我就不要啦");
}
}
public static void main(String[] args) {
TestTryLock lockTest = new TestTryLock();
//线程1
Thread t1 = new Thread(lockTest, "t1");
//线程2
Thread t2 = new Thread(lockTest, "t2");
t1.start();
t2.start();
} }
第一段注释的打印结果
t1获得了锁
t1释放了锁
t2获得了锁
t2释放了锁
之后的结果(多试几次会出现)
t1获得了锁
t1释放了锁
t2有人占着锁,我就不要啦
其实就是lock的几个方法的介绍
4)Volatile
Volatile主要解决的两个问题:可见性与顺序性
可见性:Volatile修饰的变量在执行写操作时会发出一个lock指令,这个指令会导致两个结果:
1.把当前线程中的共享变量从缓存刷到系统内存
2.把其他线程缓存了相同内存地址的数据置为无效
在早期的cpu中,这个lock指令会锁住总线,总线是cpu与其他硬件进行信息交换的通道,虽然此时cpu能独占共享内存,但这会导致其他cpu无法访问所有内存,这样会降低系统运行效率,增大开销。因此在现在的cpu中加入了缓存一致性协议,通过这个协议,cpu能够通过嗅探来感知主存中的共享变量与缓存中的值是否一致,如果检测到共享变量被修改,那么会将自己的缓存行置为无效,下次再读的时候会重新从系统内存中加载,保证了读取的值是最新的,是可见的。
顺序性:Volatile通过对变量的操作添加内存屏障来防止指令重排序
5)synchronized与Lock的比较
类别 | synchronized | Lock |
---|---|---|
存在层次 | Java的关键字,在jvm层面上 | 是一个类 |
锁的释放 | 1、以获取锁的线程执行完同步代码,释放锁 2、线程执行发生异常,jvm会让线程释放锁 | 在finally中必须释放锁,不然容易造成线程死锁 |
锁的获取 | 假设A线程获得锁,B线程等待。如果A线程阻塞,B线程会一直等待 | 分情况而定,Lock有多个锁获取的方式,因此在获取锁的时候可以不用一直等待 |
锁状态 | 无法判断 | 可以判断 |
锁类型 | 可重入 不可中断 非公平 | 可重入 可中断 可公平(两者皆可) |
性能 | 竞争不激烈时两者相当 | 竞争很激烈时,由于Lock可以不用等待,因此性能远远优于synchronized |
六、异步与同步
同步:顾名思义,就是按照顺序一步一步进行,执行完一个再执行下一个,需要等待、协调运行。同步的典型实现方式就是单线程
异步:异步就是彼此独立,在等待某事件的过程中继续做自己的事,不需要等待这一事件完成后再工作。多线程就是实现异步的一个方式。异步是让调用方法的主线程不需要同步等待另一线程的完成,从而可以让主线程干其它的事情。比如我们需要拿到一个方法的返回值,那么我们可以将这个方法的执行交给另一个线程去完成,主线程只需要开启调用这个方法的线程,然后主线程能够继续执行接下来的代码,等过一段时间之后再去拿之前那个方法的返回值。
但是我们要清楚,多线程并不等同于异步,多线程只是通过代码逻辑达到异步的效果,而真正的异步是底层硬件的一个功能。
七、线程的状态
public enum State {
/**
* 刚创建对象,没有start()的线程状态
*/
NEW,
/**
* 可运行线程的线程状态。处于可运行状态的线程正在Java虚拟机中执行,但它可能正在等待来自操作系统(如处理器)的其他资源
*/
RUNNABLE,
/**
* 线程处于阻塞状态。在进入或者重新进入synchronized代码块/方法时,等待monitor lock的一种状态
*/
BLOCKED,
/**
* 线程处于等待状态。,由于调用以下方法之一,线程会处于等待状态:
* Object.wait() 没有超时时间
* Thread.join() 没有超时时间
* LockSupport.park()
*/
WAITING,
/**
* 具有指定等待时间的等待状态。调用以下方法之一,在指定的等待时间内,使线程处于等待状态:
* Thread.sleep
* Object.wait(long) 有超时时间
* Thread.join(long) 有超时时间
* LockSupport.parkNanos(Object blocker, long nanos)
* LockSupport.parkUntil(Object blocker, long deadline)
*/
TIMED_WAITING,
/**
* 终止状态。 线程已完成执行
*/
TERMINATED;
}
Thread.yield(),一定是当前线程调用此方法,当前线程放弃获取的cpu时间片,由运行状态变会可运行状态,让OS(operating system)再次选择线程。作用:让相同优先级的线程轮流执行,但并不保证一定会轮流执行。实际中无法保证yield()达到让步目的,因为让步的线程还有可能被线程调度程序再次选中。Thread.yield()不会导致阻塞,且不会释放锁。
/****
* 当前线程让出cpu使用权进入可运行状态,但是不会释放同步锁
*/
public class TestYield {
public static void main(String[] args) {
new Thread(new Thread3()).start();
new Thread(new Thread4()).start();
}
}
class Thread3 implements Runnable {
@Override
public void run() {
synchronized (TestYield.class) {
for (int i = 1; i < 100; i++) {
System.out.println(i);
if (i == 50) {
Thread.yield();
}
}
}
}
}
class Thread4 implements Runnable {
@Override
public void run() {
synchronized (TestYield.class) {
for (int i = 1000; i < 1100; i++) {
System.out.println(i);
}
}
}
}
t.join()/t.join(long millis),当前线程里调用其它线程1的join方法,当前线程阻塞,但不释放对象锁,直到线程1执行完毕或者millis时间到,当前线程进入可运行状态。
/****
* 当前线程进入等待状态,等待其他线程执行完或者时间到,退出等待状态重新 进入可执行状态,但不会释放同步锁
*/
public class TestJoin {
public static void main(String[] args) {
Thread thread2 = new Thread(new Thread2());
Thread thread1 = new Thread(new Thread1(thread2));
thread1.start();
thread2.start();
}
}
class Thread1 implements Runnable {
private Thread thread;
public Thread1(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
synchronized (TestJoin.class) {
for (int i = 1; i < 100; i++) {
System.out.println(i);
if (i == 50) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
class Thread2 implements Runnable {
@Override
public void run() {
synchronized (TestJoin.class) {
for (int i = 1000; i < 1100; i++) {
System.out.println(i);
}
}
}
}
八、ThreadLocal
1、特点
ThreadLocal是一个线程内部的数据存储类,通过它可以在指定的线程中存储数据,对数据存储后,只有在这个线程中才可以获取到存储的数据,对于其他线程来说是无法获取到这个数据的。
2、使用场景
1)一般来说,当某些数据是以线程为作用域并且不同线程具有不同的数据副本的时候,就可以考虑采用ThreadLocal。通过它我们可以达到数据绑定与数据隔离的目的。
2)复杂逻辑下的对象传递。比如我们的线程中的代码非常复杂,具体表现为多个方法之间的调用错综复杂,但是多个方法需要用到同一个对象,平常我们可以通过两种方法来实现这个目的:将对象作为方法的参数进行传递、在线程中构造一个静态变量来共享对象。第一种方法显然是不合适的,因为如果每个方法入口都添加一个参数来进行对象传递,会让我们程序看起来很糟糕。第二种方法看起来可行,但是如果我们有10个线程,那我们是不是需要创建10个静态变量来保存对象,但是使用ThreadLocal无论多少个线程,我们都只需要创建一个ThreadLocal对象。
3、使用举例
public class TestThreadLocal {
private static ThreadLocal<String> threadLocal1;
private static ThreadLocal<Integer> threadLocal2;
public static void main(String[] args) {
threadLocal1 = new ThreadLocal<>();
threadLocal2 = new ThreadLocal<>();
new Thread(new Runnable() {
@Override
public void run() {
threadLocal1.set("A");
threadLocal2.set(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +":" + threadLocal1.get());
System.out.println(Thread.currentThread().getName() +":" + threadLocal2.get());
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
threadLocal1.set("B");
threadLocal2.set(2);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +":" + threadLocal1.get());
System.out.println(Thread.currentThread().getName() +":" + threadLocal2.get());
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() +":" + threadLocal1.get());
System.out.println(Thread.currentThread().getName() +":" + threadLocal2.get());
}
}).start();
} }
//结果
Thread-2:null
Thread-2:null
Thread-0:A
Thread-0:1
Thread-1:B
Thread-1:2
九、线程池
1、线程池的种类
线程池分为六个扩展类,分别通过Executors下的六种静态方法进行创造:
1. 固定线程池
ExecutorService service1 = Executors.newFixedThreadPool(5);
该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行,若没有,则新的任务会被暂存在一个任务队列(默认无界队列 int 最大数)中,待有线程空闲时,便处理在任务队列中的任务。
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
2. 单例线程池
ExecutorService service3 = Executors.newSingleThreadExecutor();
该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列(默认无界队列 int 最大数)中,待线程空闲,按先入先出的顺序执行队列中的任务。
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
3. 缓存线程池
ExecutorService service2 = Executors.newCachedThreadPool();
该方法返回一个可根据实际情况调整线程数量的线程池,线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程,所有线程均在工作,如果有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
4. 任务调用线程池
ExecutorService service4 = Executors.newScheduledThreadPool(2);
该方法也返回一个 ScheduledThreadPoolExecutor 对象,该线程池可以指定线程数量。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new Executors.DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newScheduledThreadPool(int var0) {
return new ScheduledThreadPoolExecutor(var0);
}
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
2、线程池的创建及其参数意义
//创建线程池,实际上最终都是直接或者间接的调用ThreadPoolExecutor的构造方法来创建的线程池 ExecutorService pool = Executors.newFixedThreadPool(5);
//而ThreadPoolExecutor的有多个构造方法,最终都是调用含有7个参数的构造函数。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
接下来对一些重要的参数进行解释:
首先我们要知道,线程池中的线程分两种:核心线程与工作线程
corePoolSize:顾名思义,其指代核心线程的数量。当新创建一个线程池,里面是没有线程的,当有任务提交到线程池时,线程池会创建一个核心线程去执行这个任务,接下来每提交一个任务都会创建一个新的核心线程来执行任务,即使有空闲的核心线程,线程池也会新创建核心线程来执行任务,知道核心线程的数量达到corePoolSize的大小,这时就不会再创建核心线程,因此corePoolSize就等于线程池最大的核心线程数量。 如果你想要提前创建并启动所有的核心线程,可以调用线程池的prestartAllCoreThreads()方法。 workQueue:阻塞队列。阻塞队列分三种: 1)有界任务队列ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小,创建之后大小就固定了; 2)无界任务队列LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE,队列大小可以变化,因此可以看做无限大; 3)同步移交队列synchronousQueue:如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。 阻塞队列主要用来存储提交的任务,不同的线程池使用的阻塞队列不同。
- maximumPoolSize:其指代线程池允许创建的最大线程数,指核心线程+工作线程的最大数量。如果队列满了,并且已创建的线程数小于maximumPoolSize,则线程池会再创建新的线程执行任务。所以只有队列满了的时候,这个参数才有意义。因此当你使用了无界任务队列的时候,队列永远不会满,这个参数就没有意义了。
- keepAliveTime:其指代线程活动保持时间,即当线程池的工作线程空闲后,保持存活的时间,当工作线程空闲时间超过keepAliveTime就会被踢出线程池,但是核心线程不会受到这个参数的影响。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率,不然线程刚执行完一个任务,还没来得及处理下一个任务,线程就被终止,而需要线程的时候又再次创建,刚创建完不久执行任务后,没多少时间又终止,会导致资源浪费。 注意:这里指的是核心线程池以外的线程。还可以设置allowCoreThreadTimeout = true这样就会让核心线程池中的线程有了存活的时间。
- TimeUnit:keepAliveTime的单位,秒钟、分钟、小时等。
- ThreadFactory:线程工厂,所有的线程都是通过这个工厂来创建的,我们可以通过这个工厂来给线程赋予一些更有意义的名字。
- RejectedExecutionHandler:其指代拒绝执行程序,可以理解为饱和策略:当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK1.5中Java线程池框架提供了以下4种策略: AbortPolicy:直接抛出异常RejectedExecutionException。 CallerRunsPolicy:只用调用者所在线程来运行任务,即由调用 execute方法的线程执行该任务。 DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。 DiscardPolicy:不处理,丢弃掉,即丢弃且不抛出异常。
这7个参数共同决定了线程池执行一个任务的策略:
当一个任务被添加进线程池时:
-
线程数量未达到 corePoolSize,则新建一个线程(核心线程)执行任务
-
线程数量达到了 corePoolSize,则将任务移入队列等待,空闲的核心线程会自动按顺序从队列中取出任务执行
-
队列已满,新建线程(非核心线程)执行任务
-
队列已满,总线程数又达到了maximumPoolSize,就会由RejectedExecutionHandler抛出异常
分割—————————-
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//第一步:如果线程数量小于核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))//则启动一个核心线程执行任务
return;
c = ctl.get();
}
//第二步:当前线程数量大于等于核心线程数,加入任务队列,成功的话会进行二次检查
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//如果发现线程池突然停止运行,则将刚刚添加到队列的任务移除,并且调用拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
//发现线程池里没有线程,启动非核心线程执行,注意这里任务是null,其实里面会去取任务队列里的任务执行
addWorker(null, false);
}
//第三步:加入不了队列(即队列满了),尝试启动非核心线程
else if (!addWorker(command, false))
//如果启动不了非核心线程执行,说明到达了maximumPoolSize,会调用拒绝策略
reject(command);
}
execute方法分为三个if判断语句,分别执行了三个逻辑部分:
1)如果 当前活动线程数 < 指定的核心线程数,则调用addWorker(command, true)方法创建并启动一个线程来执行新提交的任务(此时新建的线程相当于核心线程);
2)如果 当前活动线程数 >= 指定的核心线程数,且缓存队列未满,则将任务添加到缓存队列中;
3)如果 当前活动线程数 >= 指定的核心线程数,且缓存队列已满,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于非核心线程);
从代码中我们也可以看出,即便当前活动的线程有空闲的,只要这个活动的线程数量小于设定的核心线程数,那么依旧会启动一个新线程来执行任务。也就是说不会去复用任何线程。在execute方法里面我们没有看到线程复用的影子,那么我们来看看启动线程的addWorker方法有什么奥秘:
分割—————————-
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//前面的一个for循环目的很简单,是为workerCount加一。至于为什么代码写了这么长,是因为线程池的状态在不断变化,并发环境下需要保证变量的同步性。外循环判断线程池状态、任务非空和队列非空,内循环使用CAS机制保证workerCount正确地递增。
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 新建一个Worker对象,将任务传给这个worker对象
final Thread t = w.thread;//从worker对象中取出一个线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动刚刚从worker对象中拿出来的thread线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
所以我们可以看到,这个方法主要做的事情就是创建一个新的worker对象,并将任务传递给这个worker对象,然后启动这个worker对象内置的线程,那么这个worker对象是如何创建线程并执行我们的任务的呢?接下来看看worker对象的源码:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
最重要的构造方法:
Worker(Runnable firstTask) { // worker本身实现了Runnable接口
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask; // 持有外部传进来的runnable任务
//通过ThreadFactory新建了一个线程对象,并把自身这个worker对象给了thread,一旦该thread执行start方法,就会执行worker的run方法
this.thread = getThreadFactory().newThread(this); }
在addWorker方法中执行的t.start会去执行worker的run方法:
public void run() {
runWorker(this);
}
run方法又执行了ThreadPoolExecutor的runWorker方法,把自身这个worker对象传入:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 取出worker的runnable任务
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环不断的判断任务是否为空,当第一个判断为false的时候,即task为null,这个task啥时候为null呢?
// 要么w.firstTask为null,还记得我们在execute方法第二步的时候,执行addWorker的时候传进来的runnable是null吗?
// 要么是执行了一遍while循环,在下面的finally中执行了task=null;
// 或者执行第二个判断,一旦不为空就会继续执行循环里的代码。
// 那么最关键的地方来了,我们万万没有想到,这个getTask()是一个阻塞的方法
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run(); // 任务不为空,就会执行任务的run方法,也就是runnable的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null; // 执行完成置null,继续下一个循环
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
这个runWorker方法首先从传入的worker对象中取出我们最开始传入的任务,然后经过一系列的判断最终通过task.run()执行了我们任务中的run方法,假如我们不考虑此方法里面的while循环的第二个判断,在我们的线程开启的时候,顺序执行了runWorker方法后,当前worker的run就执行完成了。
既然执行完了那么这个线程也就没用了,只有等待虚拟机销毁了。那么回顾一下我们的目标:Java线程池中的线程是如何被重复利用的?好像并没有重复利用啊,新建一个线程,执行一个任务,然后就结束了,销毁了。没什么特别的啊,难道有什么地方漏掉了,被忽略了?
仔细回顾下该方法中的while循环的第二个判断(task = getTask)!=null
玄机就在getTask方法中:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) { //死循环
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// timed变量用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量,即存在工作线程;
// 对于超过核心线程数量的这些线程或者允许核心线程进行超时控制的时候,需要进行超时控制
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果需要进行超时控制,且上次从缓存队列中获取任务时发生了超时(timedOut开始为false,后面的循环末尾超时时会置为true)
// 或者当前线程数量已经超过了最大线程数量,那么尝试将workerCount减1,即当前活动线程数减1,
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 如果减1成功,则返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// timed=true则执行poll方法,反之则执行take方法
// 注意workQueue中的poll()方法与take()方法的区别
//poll方式取任务的特点是阻塞从缓存队列中取任务,最久阻塞keepAliveTime的时长,若超过这个时间还取不到任务则退出阻塞返回null
//take方式取任务的特点是阻塞从缓存队列中取任务,若队列为空,一直阻塞,直到能取出对象为止
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true; // 能走到这里,说明执行的是poll方法,而且已经超时了
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
从这段代码中可以看出,真正导致线程一直存活的关键就是poll与take这两个方法,由于他们的阻塞,再配上runWorker方法中的while循环,我们线程的run方法就永远不会执行完,线程也就一直处于活跃状态,而当poll方法超时时,getTask方法返回false,导致runWorker方法退出while循环,这个时候run方法就执行完了,这个线程将等待销毁,这样就实现了空闲线程超时销毁的功能。而我们的线程通过runWorker的while循环不断地从缓存队列中取出任务,然后执行任务的run方法,然后回到while循环的开头继续从队列中取出任务,这样就达到了线程复用的目的。
我们用个例子来看下:
假设我们有这么一个ThreadPoolExecutor,核心线程数设置为5(不允许核心线程超时),最大线程数设置为10,超时时间为20s,线程队列是LinkedBlockingDeque(相当于是个无界队列)。
当我们给这个线程池陆续添加任务,前5个任务执行的时候,会执行到我们之前分析的execute方法的第一步部分,会陆续创建5个线程做为核心线程执行任务,当前线程里面的5个关联的任务执行完成后,会进入各自的while循环的第二个判断getTask中去取队列中的任务,假设当前没有新的任务过来也就是没有执行execute方法,那么这5个线程就会在workQueue.take()处一直阻塞的。这个时候,我们执行execute加入一个任务,即第6个任务,这个时候会进入execute的第二部分,将任务加入到队列中,一旦加入队列,之前阻塞的5个线程其中一个就会被唤醒取出新加入的任务执行了。
在我们这个例子中,由于队列是无界的,所以始终不会执行到execute的第三部分即启动非核心线程,假如我们设置队列为有界的,那么必然就会执行到这里了
需要注意的是:
“核心线程”、“工作线程”是一个虚拟的概念,是为了方便描述而虚拟出来的概念,在代码中并没有哪个线程被标记为“核心线程”或“工作线程”,所有线程都是一样的,只是当线程池中的线程多于指定的核心线程数量时,会将多出来的线程销毁掉,池中只保留指定个数的线程。哪些线程被销毁是随机的,可能是第一个创建的线程,也可能是最后一个创建的线程,或其它时候创建的线程。
一开始我以为会有一些线程被标记为“核心线程”,而其它的则是“工作线程”,在销毁多余线程的时候只销毁那些“工作线程”,而“核心线程”不被销毁。但事实证明这种理解是错误的。
4、六种线程池的比较
最开始我们列出了五种线程池,在通过对源代码的分析之后,我们能够发现这五种线程池其实就是通过给构造函数里的7个参数赋予不同的值所组合而成,那么接下来我们比较一下这五种线程池之间的差异:
1)newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
通过构造函数我们可以看出,这个线程池核心线程数为0,最大线程数为无限大,使用的SynchronousQueue队列,当有任务来并且没有空闲线程时,execute方法会使用addWorker(null, false)创建一个新线程去获取队列里的任务,这时任务就能被添加到SynchronousQueue队列中,当没有任务时,所有线程会在1分钟后被销毁。
适用:创建一个可以无限扩大的线程池,执行很多短期异步的小程序或者负载较轻的服务器,不适用于高负荷的情况
特征: (1)线程数量不固定,可以为0,也可以达到最大值(Interger. MAX_VALUE) (2)线程池中的线程可进行缓存重复利用和回收(回收销毁默认时间为1分钟) (3)当线程池中,没有可用线程,会重新创建一个线程
2)newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
由构造函数来看,线程池的核心线程数和最大线程数相等,且为一个固定值,使用的无界等待队列,因此此线程池的线程数一直都是nThreads个不会改变,所以这个线程池是一个固定大小的线程池,超过数量的任务会放在队列中等待。
适用:控制并发量,执行长期的任务,性能好很多
特征: (1)线程池中的线程处于一定的量,可以很好的控制线程的并发量 (2)线程可以重复被使用,在显示关闭之前,都将一直存在 (3)超出一定量的线程被提交时候需在队列中等待
3)newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
这也是一个定长线程池,它的特殊之处在于等待队列,DelayedWorkQueue是一个优先级队列,能够根据任务的延时进行排序,延时短的任务放在前面,延时长的任务排在后面,因此我们可以对任务进行定时,控制任务在什么时候被执行,也能周期性执行任务。
适用:周期性执行任务的场景
特征: (1)线程池中具有指定数量的线程,即便是空线程也将保留 (2)可定时或者延迟执行线程活动
4)newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
这是一个单线程线程池,池中只会有一个线程在执行任务,多余的任务会放在无界队列中进行等待。
适用:一个任务一个任务执行的场景,可以防止多个任务同时执行
特征: (1)线程池中最多执行1个线程,之后提交的线程活动将会排在队列中依次执行
5)newSingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
顾名思义这个线程池就是newSingleThreadExecutor与newScheduledThreadPool的结合,单线程的线程池,同时能够定时执行任务。
适用:周期性执行任务,并且需要一个一个执行的场景
特征: (1)线程池中最多执行1个线程,之后提交的线程活动将会排在队列中依次执行 (2)可定时或者延迟执行线程活动
6)newWorkStealingPool
public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
这个线程池是jdk1.8才新加入的,它并不是ThreadPoolExecutor的扩展,而是另一类线程池ForkJoinPool的扩展,它能创建cpu数量的线程,并且每个线程都单独拥有一个等待队列,实现了多个cpu并行执行任务,如果当前线程的等待队列空了,它可以去其他线程的队列中偷任务来执行。
适用:可以良好的利用多核cpu并行执行任务,适用于执行很耗时的任务