并发编程工具
# 线程池
线程池是一种多线程处理的方式,它允许线程在处理完一个任务后不被销毁,而是可以被重复使用,从而提高线程的利用率、提高程序的性能,和对线程执行的管理。在 Java 中,线程池是通过 java.util.concurrent
包提供的 Executor
框架来实现的。
在 Java 中,你可以通过 Executors
工厂类创建不同类型的线程池。常见的线程池包括:
newFixedThreadPool(int n)
:创建一个固定大小的线程池,其线程数量始终不变,当有新任务提交时,若有空闲线程则立即执行,若没有则任务加入队列等待,当有线程空闲时自动执行队列中的任务。newSingleThreadExecutor()
:创建只有一个线程的线程池,保证所有任务按照指定顺序执行,在任意瞬间,最多只有一个任务被执行。newCachedThreadPool()
:创建一个可缓存的线程池,如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲的线程,当任务数增加时,此线程池又可以智能地添加新线程来处理任务。newScheduledThreadPool(int corePoolSize)
:创建一个定长线程池,支持定时及周期性任务执行。
# 理解
当谈及线程池时,我们会理解到它是用来存储一些 worker 和 job 的一个区域。
线程池模型的理解
首先我们需要理解,如果我们不配置 Schedule Pool 的话,只用它作为线程时的一个经典模型。可以将 Submit 想象成是生产者提交一个任务。然后将里面创建出来的 Worker 想象成是一个消费者,处理一个 job 就是消费一个任务。它里面有一个 Blocking Queue,Blocking Queue 可以将其想象成是一个队列。然后,生产者将生产出的消息放到队列里面,也就是将任务放到队列里面。然后消费者从队列中获取任务并执行,主要是这样一个模型。
使用线程池
要使用线程池,有两种方式:自动方式和手动方式。
首先我们先谈一下自动方式
第一个是 New Fixed Pool,主要是为了固定消费者数量这样的一个场景。这种线程池一般用在 work 的数量上,需要根据实际情况进行设置。这种线程池比较适应的场景是我们已知道这个任务是一个单体的应用,它可以给多少个线程,我们已知可以给多少个线程,多少个线程就是它的上限了。然后我们可以将这个数值放进去,也就是说这个线程池的消费者数量是一个固定的值。但是它也会有一些问题,就比如说,如果设置的这个消费者的数量不够,那么就无法处理完那么多的任务。这时就会导致队列不断增长,直到队列塞满,它只能走拒绝策略,导致必然丢失很多任务。这也是与消息消费模型很类似的一个点。
第二个是 New Single Pool,就是套用到我们刚才的模型中,也就是一个 worker 去处理任务。它也会带来队列过长导致的一些问题。这是只有一个线程的线程池,它可以避免频繁创建和销毁线程所带来的性能开销。
用官方一点的话来说,newSingleThreadExecutor返回一个包含单线程的Executor,将多个任务交给此Exector时,这个线程处理完一个任务后接着处理下一个任务(按blocking queue顺序的),若该线程出现异常,将会有一个新的线程来替代。
第三个是 newCachedThreadPool。根据用户的任务数创建相应的线程来处理,该线程池不会对线程数目加以限制,完全依赖于JVM能创建线程的数量,可能引起内存不足。 底层是基于ThreadPoolExecutor实现,借助reentrantlock保证并发。 coreSize核心线程数,maxsize最大线程数。
还有一个叫做 New Scheduled Pool,就是一个定时的队列。这个队列与刚才说的生产者生产消费模型有一点区别,就是多了一个延时队列。任务可能是隔一秒钟之后再去执行,或者隔几秒钟之后再去执行,主要用到了一个延时队列。
// 使用标准构造器,构造一个普通的线程池
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数,即使线程空闲(Idle),也不会回收;
int maximumPoolSize, // 线程数的上限;
long keepAliveTime, TimeUnit unit, // 线程最大空闲(Idle)时长,超过这个时长,多余的worker就会被回收
BlockingQueue workQueue, // 任务的排队队列
ThreadFactory threadFactory, // 新线程的产生方式
RejectedExecutionHandler handler) // 拒绝策略
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
【定时任务线程池】:延时队列实现
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
try {
// 1:指定任务首次执行的延迟时间,这里表示首次执行会延迟 1 单位的时间。
// 3:指定连续两次任务开始执行的时间间隔,这里表示每隔 3 单位的时间执行一次任务。
scheduledPool.scheduleAtFixedRate(Task(), 1, 3, TimeUnit.SECONDS);
} finally {
scheduledPool.shutdown();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 线程池数量
当谈论线程池的数量时,我们指的是线程池中的工作线程数量。我们设置这些工作线程的数量主要取决于要执行的任务类型,即它是属于I/O密集型的业务还是CPU密集型的业务。
- 如果我们要执行的任务涉及大量的统计计算等操作,通常会将工作线程的数量设定为与CPU内核数相同。
- 而如果当前的任务是I/O密集型的,涉及大量的文件拷贝等操作,我们就不需要占用太多的CPU资源。在这种情况下,我们可以适当增加工作线程的数量,让CPU只分配少量的时间片给每个工作线程执行任务。这样可以大大提高整体的吞吐量,实现优化。
# 拒绝策略
拒绝的时机
第一个是当这个 executor 关闭的时候,再去提交新的任务就会被拒绝。第二种是我们 eecutor 的工作队列已经满了,没有办法再接受新的任务,这个时候就会走到我们的拒绝策略中
拒绝策略 在Java的线程池中,常见的拒绝策略有四种:
- Caller-Runs Policy (调用者运行策略):当任务无法被处理时,交给提交任务的线程自己去尝试执行这个任务。这样的话,提交任务的线程会尝试处理这个任务,从而减缓提交速度,但有可能造成当前线程阻塞。
- caller-runs policy 是一种简单的实现限流的方式。因此在一些消费型任务中可以考虑使用,而面向c端,高响应接口的不建议使用,避免客户端调用已经超时,服务端还在继续处理任务的情况。
- Abort Policy (抛弃策略):当任务无法被处理时,抛出一个未检查的异常 RejectedExecutionException。
- Discard Policy (丢弃策略):当任务无法被处理时,默默地丢弃即将被提交的任务,不做任何处理,这意味着任务会被丢弃并且不会给出任何提示。异常对于发现问题很重要,因此使用这种策略要谨慎考虑,看异常是否可忽略。
- Discard Oldest Policy (丢弃最老的任务策略):当任务无法被处理时,丢弃工作队列中最旧的任务,然后尝试重新提交当前任务。这种策略往往会导致最重要的任务被丢弃
# FutureTask
# 概念
这个东西很简单一句话来说,就是:
- FutureTask叫未来任务,可以将一个复杂的任务剔除出去交给另外一个线程来完成
- 【具体一点】:
FutureTask
是用于包装一个即将执行的任务,并且可以异步地交给其他线程去处理。它提供了一些 api 来启动任务的执行,并在任务执行完成后获取其结果。
# 常用API
get
方法:这个方法用于获取异步任务的结果。如果任务已经完成,则它会立即返回结果;如果任务尚未完成,它将会阻塞主线程,直到任务完成并返回结果。然而,在FutureTask
中并没有称为 "ti 帽的 get ti 帽的方法"。get
方法的超时处理:在FutureTask
中,确实有一个重载的get
方法,允许您指定等待的最大时间,在这段时间内如果异步任务没有返回结果,它将抛出 TimeoutException。cancel
方法:FutureTask
允许您取消任务的执行。调用cancel
时,如果任务还没有开始执行,它将有可能被取消;但是如果任务已经开始执行或者已经执行完成,cancel
方法将返回 false,并且任务不会被取消。isDone
方法:这个方法用于查询异步任务是否已经完成。您提到的 "extand" 方法可能是指isDone
方法的功能,用来查询异步任务是否已经完成操作。
# 启动一个子任务
import java.util.concurrent.FutureTask;
public class FutureTaskExample {
public static void main(String[] args) throws Exception {
// 创建一个 FutureTask,并传入一个 Callable 对象
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
int result = 0;
for (int i = 1; i <= 5; i++) {
result += i;
// 模拟耗时任务
try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}
}
return result;
});
// 启动任务的执行 (这里可以放到线程池里面去做)
// 等价于 xxxPool.submit(futureTask)
new Thread(futureTask).start();
// 在这里可以进行一些其他操作
// 获取任务的执行结果,如果任务尚未完成,这里会阻塞等待直到任务完成
int result = futureTask.get();
System.out.println("任务执行结果:" + result);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26