池化之线程池
java中池化技术是提前保存大量的资源,以备不时之需以及重复使用。
1、池化技术
在实际应用当做,分配内存、创建进程、线程都会设计到一些系统调用,系统调用需要导致程序从用户态切换到内核态,是非常耗时的操作。因此,当程序中需要频繁的进行内存申请释放,进程、线程创建销毁等操作时,通常会使用内存池、进程池、线程池技术来提升程序的性能。
进程池、线程池:先启动若干数量的线程,并让这些线程都处于睡眠状态,当需要一个开辟一个线程去做具体的工作时,就会唤醒线程池中的某一个睡眠线程,让它去做具体工作,当工作完成后,线程又处于睡眠状态,而不是将线程销毁。当线程数达到一定数量时,可以在队列中等待。
内存池:内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
2、线程池好处
几乎所有需要异步或者并发执行任务的程序都可以使用线程池。
合理使用会给我们带来以下好处。
降低系统消耗:重复利用已经创建的线程降低线程创建和销毁造成的资源消耗。
提高响应速度:当任务到达时,任务不需要等到线程创建就可以立即执行。
提供线程可以管理性:可以通过设置合理分配、调优、监控。
频繁手动创建线程为什么开销会大?跟new Object() 有什么差别?
虽然Java中万物皆对象,但是new Thread() 创建一个线程和 new Object()还是有区别的。
new Object()过程如下:
JVM分配一块内存 M
在内存 M 上初始化该对象
将内存 M 的地址赋值给引用变量 obj
创建线程的过程如下:
JVM为一个线程栈分配内存,该栈为每个线程方法调用保存一个栈帧
每一栈帧由一个局部变量数组、返回值、操作数堆栈和常量池组成
每个线程获得一个程序计数器,用于记录当前虚拟机正在执行的线程指令地址
系统创建一个与Java线程对应的本机线程
将与线程相关的描述符添加到JVM内部数据结构中
线程共享堆和方法区域
创建一个线程大概需要1M左右的空间(Java8,机器规格2c8G)。可见,频繁手动创建/销毁线程的代价是非常大的。
3、线程池工作流程
1、判断核心线程池里的线程是否都有在执行任务,否->创建一个新工作线程来执行任务。是->走下个流程。
2、判断工作队列是否已满,否->新任务存储在这个工作队列里,是->走下个流程。
3、判断线程池里的线程是否都在工作状态,否->创建一个新的工作线程来执行任务,是->走下个流程。
4、按照设置的策略来处理无法执行的任务。
4、线程池的创建
// 创建线程工厂实例
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
// 创建线程池,核心线程数、最大线程数、空闲保持时间、队列长度、拒绝策略可自行定义
ExecutorService pool = new ThreadPoolExecutor(5, 20, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
1.corePoolSize:
核心线程池大小,当提交一个任务时,线程池会创建一个线程来执行任务,即使其他空闲的核心线程能够执行新任务也会创建,等待需要执行的任务数大于线程核心大小就不会继续创建。
2.maximumPoolSize:
线程池最大数,允许创建的最大线程数,如果队列满了,并且已经创建的线程数小于最大线程数,则会创建新的线程执行任务。如果是无界队列,这个参数基本没用。
3.keepAliveTime:
线程保持活动时间,线程池工作线程空闲后,保持存活的时间,所以如果任务很多,并且每个任务执行时间较短,可以调大时间,提高线程利用率。
4.unit:
线程保持活动时间单位,天(DAYS)、小时(HOURS)、分钟(MINUTES、毫秒MILLISECONDS).微秒(MICROSECONDS)、纳秒(NANOSECONDS)
5.workQueue:
任务队列,保存等待执行的任务的阻塞队列。一般来说可以选择如下阻塞队列:
(1) ArrayBlockingQueue:基于数组的有界阻塞队列。
(2)LinkedBlockingQueue:基于链表的阻塞队列。
(3)SynchronizedQueue:一个不存储元素的阻塞队列。
(4)PriorityBlockingQueue:一个具有优先级的阻塞队列。
6.threadFactory:
设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
7.handler:
饱和策略也叫拒绝策略。当队列和线程池都满了,即达到饱和状态。所以需要采取策略来处理新的任务。默认策略是AbortPblicy
(1)AbortPolicy:直接抛出异常。
(2)CallerRunsPolicy:调用者所在的线程来运行任务。
(3)DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
(4)DiscardPolicy:不处理,直接丢掉。
当然可以根据自己的应用场景,实现RejectedExecutionHandler接口自定义策略。
5、向线程池提交任务
可以使用execute()和submit()两种方式提交任务。
execute():无返回值,所以无法判断任务是否被执行成功。
submit(:用于提交需要有返回值的任务。线程池返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()来获取返回值,get()方法会阻塞当前线程知道任务完成。get(long timeout,TimeUnit unit)可以设置超时时间。
6、线程池的关闭
(1)正常关闭
ExecutorService pool=...;
// 用于线程内无迭代,且预期在短时间内能执行完毕的线程任务;
pool.shutdown();
// 用于线程内有迭代逻辑,或执行完成时间无法预估的场景(此类线程任务代码必须进行中断信号的处理);
pool.shutdownNow();
可以通过shutdown()或shutdownNow()来关闭线程池。
它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt来中断线程,所以无法响应终端的任务可以能永远无法停止
shutdownNow首先将线程池状态设置成STOP;然后尝试停止所有的正在执行或者暂停的线程,并返回等待执行任务的列表。
shutdown只是将线程池的状态设置成shutdown状态,然后中断所有没有正在执行任务的线程。
只要调用两者之一,isShutdown就会返回true,当所有任务都已关闭,isTerminaed就会返回true。一般来说调用shutdown方法来关闭线程池,如果任务不一定要执行完,可以直接调用shutdownNow方法。
(2)中断异常处理
//所有会引起中断异常的代码段都要单独处理中断异常,绝对不可合并在Exception或Throwable里处理;并根据代码结构和业务逻辑判断,是否需要恢复中断异常
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
// 中断处理
} catch (Exception e) {
// 其它异常处理
}
(3)库代码中断异常处理
//库代码中断异常处理主要有两种方式,如下:
- 传递InterruptedException:避开这个异常通常是最明智的策略——只需将InterruptedException传递给方法的调用者。
- 恢复中断:有时不能抛出InterruptedException,例如代码位于Runnable中时。在这些情况下,捕获InterruptedException并使用当前线程上的Interrupt方法恢复中断状态,这样在调用栈中更高层的代码将看到引发了一个中断。
(4)检查中断状态
// 测试当前线程是否已经中断。线程的中断状态 由该方法清除。
Thread.interrupted();
// 测试线程是否已经中断。线程的中断状态不受该方法的影响。
Thread.currentThread().isInterrupted();
(5)线程内有迭代逻辑时的中断处理参考代码模板
// 迭代必须包含对中断信号的响应,以及对中断异常的处理
while (线程中断状态为false) {
// 业务代码
}
int listSize = 100;
for (int i = 0; i < listSize; i++) {
if (线程中断状态为false) {
break;
}
// 业务代码
}
7、线程池如何配置合理
配置线程池可以从以下几个方面考虑。
任务是cpu密集型、IO密集型或者混合型·任务优先级,高中低。
任务时间执行长短。
任务依赖性:是否依赖其他系统资源。
cpu密集型可以配置可能小的线程,比如n+1个线程。io密集型可以配置较多的线程,如2n个线程。
混合型可以拆成io密集型任务和cpu密集型任务,
如果两个任务执行时间相差大,否->分解后执行吞吐量将高于串行执行吞吐量。否->没必要分解。
可以通过Runtime.getRuntime().availableProcessors()来获取cpu个数。建议使用有界队列,增加系统的预警能力和稳定性。
8、JDK线程示例
(0)FixedThreadPool
可重用固定线程数的线程池。查看源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
corePoolSize和maxPoolSize都被设置成我们设置的nThreads。
当线程池中的线程数大于corePoolSize ,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止,如果设为0,表示多余的空闲线程会立即终止。
工作流程:
1.当前线程少于corePoolSize,创建新线程执行任务。
2.当前运行线程等于corePoolSize,将任务加入LinkedBlockingQueue。
3.线程执行完1中的任务,会循环反复从LinkedBlockingQueue获取任务来执行。LinkedBlockingQueue作为线程池工作队列(默认容量Integer.MAX_VALUE)。因此可能会造成如下。
3.1.当线程数等于corePoolSize时,新任务将在队列中等待,因为线程池中的线程不会超过corePoolSize。
3.2.maxnumPoolSize等于说是一个无效参数。
3.3.keepAliveTime等于说也是一个无效参数。
3.4.运行中的FixedThreadPool(未执行shundown或shundownNow)则不会调用拒绝策略。
3.5.由于任务可以不停的加到队列,当任务越来越多时很容易造成OOM。
(1)SingleThreadExecutor
是使用单个worker线程的Executor。查看源码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
corePoolSize和maxnumPoolSize被设置为1。其他参数和FixedThreadPool相同。执行流程以及造成的影响同FixedThreadPool。
(2)CachedThreadPool
根据需要创建新线程的线程池。查看源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
corePoolSize设置为0,maxmumPoolSize为Integer.MAX_VALUE。keepAliveTime为60秒。工作流程:
1.首先执行SynchronousQueue.offer (Runnable task)。如果当前maximumPool中有空闲线程正在执行SynchronousQueue.pol(keepAliveTIme,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute方法执行完成;否则执行下面的步骤2。
2.当初始maximumPool为空或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll (keepAliveTime, TimeUnit.NANOSECONDS)。这种情况下,步骤1将失败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成。
3.在步骤2中新创建的线程将任务执行完后,会执行SynchronousQueue.poll (keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执行步骤1),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。
(3)创建定时任务
// 禁止使用Timer,一律使用ScheduledExecutorService
ScheduledExecutorService schedule =
new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("scheduled-%d").build());
// 创建并执行在给定延迟后启用的一次性操作
schedule.schedule(new DemoWorker(), 60000L, TimeUnit.MILLISECONDS);
/*
* 创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;
* 也就是将在 initialDelay 后开始执行,w然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
*/
schedule.scheduleAtFixedRate(new DemoWorker(), 60000L, 60000L, TimeUnit.MILLISECONDS);
// 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。
schedule.scheduleWithFixedDelay(new DemoWorker(), 60000L, 60000L, TimeUnit.MILLISECONDS);
9、建议
CountDownLatch实现所有线程达到某一个目标后,再继续进行下一步工作
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
ExecutorService executor = new ThreadPoolExecutor(5, 20, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
int count = 500;
// CountDownLatch的计数器需要和任务数相同,执行完一个任务调用countDown让计数器减一
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(50);
System.out.println("运行任务");
} catch (Exception e) {
e.printStackTrace();
}
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("都运行完了");
采用Lock加锁在finally块中释放锁,保证不论是否抛出异常锁一定会被释放:
Lock lock = new ReentrantLock();
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "得到了锁");
System.out.println("do something...");
} catch (Exception e) {
System.out.println("handle exception");
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + "释放了锁");
}
10、springboot线程池ThreadPoolTaskExecutor以及@Async异步注解使用方法
方式一:
在Application启动类上面加上@EnableAsync,在需要异步执行的方法上加上@Async注解
方式二:直接调用ThreadPoolTaskExecutor
配置线程池的类型与参数,如下
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池配置
**/
@Configuration
public class ThreadPoolConfig
{
// 核心线程池大小
private int corePoolSize = 50;
// 最大可创建的线程数
private int maxPoolSize = 200;
// 队列最大长度
private int queueCapacity = 1000;
// 线程池维护线程所允许的空闲时间
private int keepAliveSeconds = 300;
@Bean(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor()
{
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(maxPoolSize);
executor.setCorePoolSize(corePoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
// 线程池对拒绝任务(无线程可用)的处理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
然后使用该配置
// 使用方式一
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
threadPoolTaskExecutor.submit(xxxxx);
// 使用方式二:必须指定bean名字不然就默认加载到其他的配置
@Async("threadPoolTaskExecutor")
public void hello(String name){
logger.info("异步线程启动 started."+name);
}