目 录CONTENT

文章目录

JUC - 线程池与异步编排

lionkliu
2022-09-09 / 0 评论 / 0 点赞 / 26 阅读 / 6,584 字

1、创建线程的四种方式

1.1 继承Thread

/**
 * @author : lyj
 * @Timer : 2022/9/8
 * @Description :
 */
public class TestThread {
    public static void main(String[] args) {
        System.out.println("main ..... start");
        Thread01 thread01 = new Thread01();
        thread01.start();
        System.out.println("main ..... end");

    }

    public static class Thread01 extends Thread {
        @Override
        public void run() {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2 ;
            System.out.println("运行结果:" + i);
        }
    }

}

image-20220908103537022

从运行结果可以看出,main线程并不会等待thread01线程运行再运行。

1.2 实现Runnable接口

/**
 * @author : lyj
 * @Timer : 2022/9/8
 * @Description :
 */
public class TestThread {
    public static void main(String[] args) {
        System.out.println("main ..... start");
        Thread02 thread02 = new Thread02();
        new Thread(thread02).start();
        System.out.println("main ..... end");

    }

    public static class Thread02 implements Runnable {

        @Override
        public void run() {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        }
    }

}

image-20220908103934919

1.3 实现Callable接口

/**
 * @author : lyj
 * @Timer : 2022/9/8
 * @Description :
 */
public class TestThread {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println("main ..... start");
        FutureTask<Integer> futureTask = new FutureTask<>(new Thread03());
        new Thread(futureTask).start();

        // 阻塞等待线程执行完成,获得结果
        Integer result = futureTask.get();
        System.out.println("main ..... end.. " + " result: " + result);

    }

    public static class Thread03 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }
    }
}

image-20220908105001244

可以看出到futureTask.get();这一段程序时,main线程 阻塞等待线程执行完成,获得结果、

1.4 线程池

在一般的业务代码中并不会使用上面三种方式启动线程,如一个业务里面new了十个线程,而此时有100万个并发进入我们系统,此时系统就会同时new出1000万个线程,我们的系统也会因为资源耗尽而崩溃。当使用线程池的时候,比如我们在初始化的时候给线程池初始化了50个线程,当系统需要线程的时候就会将任务提交给线程池,用完之后线程会被放回线程池中,这50个线程就会被重复使用,提高我们系统的可靠性。

使用线程池的优点:

  • 降低资源的消耗
    • 通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
  • 提高响应速度
    • 因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行
  • 提高线程的可管理性
    • 线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配
public class TestThread {
    // 初始化线程池里面10个线程
    public static ExecutorService service = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        System.out.println("main ..... start");
        service.execute(new Thread02());
        System.out.println("main ..... end.. " );

    }

    public static class Thread02 implements Runnable {

        @Override
        public void run() {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        }
    }
}

image-20220908111006105

2、线程池详解

2.1 线程池的创建

① 通过Executors工具类

Executors类提供4个静态工厂方法:

  • newCachedThreadPool():创建一个可缓存的无界线程池,如果线程池长度超过处理需要,可灵活回收空线程,若无可回收,则新建线程。当线程池中的线程空闲时间超过60s,则会自动回收该线程,当任务超过线程池的线程数则创建新的线程,线程池的大小上限为Integer.MAX_VALUE,可看作无限大。
  • newFixedThreadPool(int):创建一个指定大小的线程池,可控制线程的最大并发数,超出的线程会在LinkedBlockingQueue阻塞队列中等待
  • newSingleThreadExecutor:创建一个单线程化的线程池,它只有一个线程,用仅有的一个线程来执行任务,保证所有的任务按照指定顺序(FIFO,LIFO,优先级)执行,所有的任务都保存在队列LinkedBlockingQueue中,等待唯一的单线程来执行任务
  • newScheduledThreadPool(int):创建一个定长的线程池,可以指定线程池核心线程数,支持定时及周期性任务的执行

这些方法最终都是通过ThreadPoolExecutor类来完成的

② 原始方法: ThreadPoolExecutor

        ExecutorService executorService = new ThreadPoolExecutor(5,
                200,
                10,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(100000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

2.2 七大参数

ThreadPoolExecutor 构造函数源码如下:

image-20220908112050052

  1. int corePoolSize:(核心线程数),线程池创建好以后就准备就绪的线程数量,等待异步任务去执行。一直存在即使它们是空闲的,除非设置allowCoreThreadTimeOut。条件:必须大于或等于0;

  2. int maximumPoolSize(最大线程数)条件:必须大于或等于1,maximumPoolSize必须大于或等于corePoolSize;

  3. long keepAliveTime(线程存活保持时间)如果当前的线程数量大于 core 数量,只要线程空闲大于指定的 keepAliveTime ,就会释放空闲的线程(maximumPoolSize - corePoolSize) 。条件:必须大于或等于0;

  4. TimeUnit unit:keepAliveTime参数的时间单位

  5. BlockingQueue<Runnable> workQueue(阻塞队列)如果任务很多,多的任务就会放在队列里,只要线程空闲,就会去队列里取出新任务执行

  6. ThreadFactory threadFactory(线程的创建工厂)不能为空,默认为DefaultThreadFactory类

  7. RejectedExecutionHandler handler(线程饱和拒绝策略)不能为空,默认策略为ThreadPoolExecutor.AbortPolicy。

2.3 运行流程

1、线程池创建,准备好 core 数量的核心线程,准备接受任务

2、新的任务进来,使用 core 准备好的空闲线程执行

  • core 满了:将再进来的任务放入阻塞队列中,空闲的 core 就会自己去阻塞队列获取任务执行
  • 阻塞队列满了:直接开新线程执行,最大开到 max 指定的数量
  • max 都执行好了:Max-core 数量空闲的线程会在 keepAliveTime 指定的时间后自动销毁。最终到 core 数
  • 如果线程数开到了 max 数量,还有新任务进来,就会使用 reject指定的拒绝策略进行处理

3、所有的线程创建有指定的factory 创建的

示例:

一个线程池:core:7 ,max:20 ,queue:50,100个并发进来,怎么分配的?
7个会立即得到执行,50个会进入队列,然后再开13个进行执行。剩下的30个就使用拒绝策略。|

3、CompletableFuture 异步编排

3.1 创建异步任务

CompletableFuture 提供了四个静态方法来创建一个异步操作:

public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor);

说明:runAsync无返回值,supplyAsync有返回值。Executor executor可指定线程池

/**
 * @author : lyj
 * @Timer : 2022/9/8
 * @Description :
 */
public class TestCompletableFuture {
    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    @SneakyThrows
    public static void main(String[] args) {
        System.out.println("main ..... start... ");

        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        }, executor);

        CompletableFuture<Integer> supplyFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, executor);
        Integer res = supplyFuture.get();

        System.out.println("main ..... end... " + res);
    }
}

3.2 计算完成回调方法

image-20220908152512042

whenComplete

CompletableFuture<Integer> supplyFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 2;
    System.out.println("运行结果:" + i);
    return i;
}, executor).whenComplete((res,exception)->{
    System.out.println("异步任务完成,res:"+res + " ,异常 exception:"+exception);
});

exceptionally

CompletableFuture<Integer> supplyFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("运行结果:" + i);
            return i;
        }, executor).whenComplete((res, exception) -> {
            System.out.println("异步任务完成,res:" + res + " ,异常 exception:" + exception);
        }).exceptionally(throwable -> {
            // 可以感知异常。同时返回默认值
            return 10;
        });

handle

CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程:" + Thread.currentThread().getId());
    int i = 10 / 0;
    System.out.println("运行结果:" + i);
    return i;
}, executor).handle((res, thr) -> {
    if (res != null) {
        return res * 2;
    }
    // 有异常
    if (thr != null) {
        return 0;
    }
    return 0;
});

whenComplete 、exceptionally 和 handle 区别:

  • whenComplete可以得到异常消息,但没法修改返回数据
  • exceptionally 可以感知异常,如果出现异常,可以自定义返回值

3.3 线程串行化

0

评论区

// // // //