1、创建线程的四种方式
1.1 继承Thread
/**
* @author : lyj
* @Timer : 2022/9/8
* @Description :
*/
public class TestThread {
public static void main(String[] args) {
System.out.println("main ..... start");
Thread01 thread01 = new Thread01();
thread01.start();
System.out.println("main ..... end");
}
public static class Thread01 extends Thread {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2 ;
System.out.println("运行结果:" + i);
}
}
}
从运行结果可以看出,main线程并不会等待thread01线程运行再运行。
1.2 实现Runnable接口
/**
* @author : lyj
* @Timer : 2022/9/8
* @Description :
*/
public class TestThread {
public static void main(String[] args) {
System.out.println("main ..... start");
Thread02 thread02 = new Thread02();
new Thread(thread02).start();
System.out.println("main ..... end");
}
public static class Thread02 implements Runnable {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}
}
}
1.3 实现Callable接口
/**
* @author : lyj
* @Timer : 2022/9/8
* @Description :
*/
public class TestThread {
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main ..... start");
FutureTask<Integer> futureTask = new FutureTask<>(new Thread03());
new Thread(futureTask).start();
// 阻塞等待线程执行完成,获得结果
Integer result = futureTask.get();
System.out.println("main ..... end.. " + " result: " + result);
}
public static class Thread03 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}
}
}
可以看出到futureTask.get();
这一段程序时,main线程 阻塞等待线程执行完成,获得结果、
1.4 线程池
在一般的业务代码中并不会使用上面三种方式启动线程,如一个业务里面new了十个线程,而此时有100万个并发进入我们系统,此时系统就会同时new出1000万个线程,我们的系统也会因为资源耗尽而崩溃。当使用线程池的时候,比如我们在初始化的时候给线程池初始化了50个线程,当系统需要线程的时候就会将任务提交给线程池,用完之后线程会被放回线程池中,这50个线程就会被重复使用,提高我们系统的可靠性。
使用线程池的优点:
- 降低资源的消耗
- 通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
- 提高响应速度
- 因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行
- 提高线程的可管理性
- 线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配
public class TestThread {
// 初始化线程池里面10个线程
public static ExecutorService service = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
System.out.println("main ..... start");
service.execute(new Thread02());
System.out.println("main ..... end.. " );
}
public static class Thread02 implements Runnable {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}
}
}
2、线程池详解
2.1 线程池的创建
① 通过Executors
工具类
Executors类提供4个静态工厂方法:
newCachedThreadPool()
:创建一个可缓存的无界线程池,如果线程池长度超过处理需要,可灵活回收空线程,若无可回收,则新建线程。当线程池中的线程空闲时间超过60s,则会自动回收该线程,当任务超过线程池的线程数则创建新的线程,线程池的大小上限为Integer.MAX_VALUE,可看作无限大。newFixedThreadPool(int)
:创建一个指定大小的线程池,可控制线程的最大并发数,超出的线程会在LinkedBlockingQueue阻塞队列中等待newSingleThreadExecutor
:创建一个单线程化的线程池,它只有一个线程,用仅有的一个线程来执行任务,保证所有的任务按照指定顺序(FIFO,LIFO,优先级)执行,所有的任务都保存在队列LinkedBlockingQueue中,等待唯一的单线程来执行任务newScheduledThreadPool(int)
:创建一个定长的线程池,可以指定线程池核心线程数,支持定时及周期性任务的执行
这些方法最终都是通过ThreadPoolExecutor
类来完成的
② 原始方法: ThreadPoolExecutor
ExecutorService executorService = new ThreadPoolExecutor(5,
200,
10,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(100000),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
2.2 七大参数
ThreadPoolExecutor 构造函数源码如下:
-
int corePoolSize
:(核心线程数),线程池创建好以后就准备就绪的线程数量,等待异步任务去执行。一直存在即使它们是空闲的,除非设置allowCoreThreadTimeOut。条件:必须大于或等于0; -
int maximumPoolSize
(最大线程数)条件:必须大于或等于1,maximumPoolSize必须大于或等于corePoolSize; -
long keepAliveTime
(线程存活保持时间)如果当前的线程数量大于 core 数量,只要线程空闲大于指定的 keepAliveTime ,就会释放空闲的线程(maximumPoolSize - corePoolSize) 。条件:必须大于或等于0; -
TimeUnit unit
:keepAliveTime参数的时间单位 -
BlockingQueue<Runnable> workQueue
(阻塞队列)如果任务很多,多的任务就会放在队列里,只要线程空闲,就会去队列里取出新任务执行 -
ThreadFactory threadFactory
(线程的创建工厂)不能为空,默认为DefaultThreadFactory类 -
RejectedExecutionHandler handler
(线程饱和拒绝策略)不能为空,默认策略为ThreadPoolExecutor.AbortPolicy。
2.3 运行流程
1、线程池创建,准备好 core 数量的核心线程,准备接受任务
2、新的任务进来,使用 core 准备好的空闲线程执行
- core 满了:将再进来的任务放入阻塞队列中,空闲的 core 就会自己去阻塞队列获取任务执行
- 阻塞队列满了:直接开新线程执行,最大开到 max 指定的数量
- max 都执行好了:Max-core 数量空闲的线程会在 keepAliveTime 指定的时间后自动销毁。最终到 core 数
- 如果线程数开到了 max 数量,还有新任务进来,就会使用 reject指定的拒绝策略进行处理
3、所有的线程创建有指定的factory 创建的
示例:
一个线程池:core:7 ,max:20 ,queue:50,100个并发进来,怎么分配的?
7个会立即得到执行,50个会进入队列,然后再开13个进行执行。剩下的30个就使用拒绝策略。|
3、CompletableFuture 异步编排
3.1 创建异步任务
CompletableFuture 提供了四个静态方法来创建一个异步操作:
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor);
说明:runAsync无返回值,supplyAsync有返回值。
Executor executor
可指定线程池
/**
* @author : lyj
* @Timer : 2022/9/8
* @Description :
*/
public class TestCompletableFuture {
public static ExecutorService executor = Executors.newFixedThreadPool(10);
@SneakyThrows
public static void main(String[] args) {
System.out.println("main ..... start... ");
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}, executor);
CompletableFuture<Integer> supplyFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executor);
Integer res = supplyFuture.get();
System.out.println("main ..... end... " + res);
}
}
3.2 计算完成回调方法
whenComplete
CompletableFuture<Integer> supplyFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executor).whenComplete((res,exception)->{
System.out.println("异步任务完成,res:"+res + " ,异常 exception:"+exception);
});
exceptionally
CompletableFuture<Integer> supplyFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).whenComplete((res, exception) -> {
System.out.println("异步任务完成,res:" + res + " ,异常 exception:" + exception);
}).exceptionally(throwable -> {
// 可以感知异常。同时返回默认值
return 10;
});
handle
CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).handle((res, thr) -> {
if (res != null) {
return res * 2;
}
// 有异常
if (thr != null) {
return 0;
}
return 0;
});
whenComplete 、exceptionally 和 handle 区别:
- whenComplete可以得到异常消息,但没法修改返回数据
- exceptionally 可以感知异常,如果出现异常,可以自定义返回值
评论区