如何实现基于Spring Boot的线程池实时监控与报警机制?

2026-05-23 04:021阅读0评论SEO问题
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计10760个文字,预计阅读时间需要44分钟。

前言:这篇是推动大家异步编程思想的核心线程池的预备篇,要做好监控,让大家使用无忧,敬畏生产。

为什么需要对线程池进行监控?Java线程池作为最常使用的并发工具,为什么如此信任它?

前言

这篇是推动大家异步编程的思想的线程池的准备篇,要做好监控,让大家使用无后顾之忧,敬畏生产。

为什么需要对线程池进行监控

Java线程池作为最常使用到的并发工具,相信大家都不陌生,但是你真的确定使用对了吗?大名鼎鼎的阿里Java代码规范要求我们不使用 Executors来快速创建线程池,但是抛弃Executors,使用其它方式创建线程池就一定不会出现问题吗?本质上对于我们来说线程池本身的运行过程是一个黑盒,我们没办法了解线程池中的运行状态时,出现问题没有办法及时判断和预警。面对这种黑盒操作必须通过监控方式让其透明化,这样对我们来说才能更好的使用好线程池。因此必须对线程池做监控。

image.png
如何做线程池的监控

对于如何做监控,本质就是涉及三点,分别是数据采集、数据存储以及大盘的展示,接下来我们分说下这三点;

数据采集

采集什么数据,对于我们来说需要采集就是黑盒的数据,什么又是线程池的黑盒数据,其实也就是整个线程处理的整个流程,在整个流程中,我们可以通过ThreadPoolExecutor中的七个方法获取数据,通过这七个方法采集到的数据就可以使线程池的执行过程透明化。

  1. getCorePoolSize():获取核心线程数;
  2. getMaximumPoolSize:获取最大线程数;
  3. getQueue():获取线程池中的阻塞队列,并通过阻塞队列中的方法获取队列长度、元素个数等;
  4. getPoolSize():获取线程池中的工作线程数(包括核心线程和非核心线程);
  5. getActiveCount():获取活跃线程数,也就是正在执行任务的线程;
  6. getLargestPoolSize():获取线程池曾经到过的最大工作线程数;
  7. getTaskCount():获取历史已完成以及正在执行的总的任务数量;

除了我们了解的这些流程以外,ThreadPoolExecutor中还提供了三种钩子函数,

  1. beforeExecute():Worker线程执行任务之前会调用的方法;
  2. afterExecute():在Worker线程执行任务之后会调用的方法;
  3. terminated():当线程池从运行状态变更到TERMINATED状态之前调用的方法;

对于beforeExecute和afterExecute可以理解为使用Aop监听线程执行的时间,这样子我们可以对每个线程运行的时间整体做监控,terminated可以理解为线程关闭时候的监控,这样我们就可以整体获取采集到线程池生命周期的所有数据了。

数据存储以及大盘的展示

对于存储我们这个比较适合采用时序性数据库,此外现在很多成熟的监控产品都可以满足我们大屏展示的诉求,这里推荐下美团Cat和Prometheus,这里不展开进行讲解,大家可以根据自己公司的监控产品进行选择,对于不同的方案采取的存储形式会有些差异,甚至自己都可以自定义实现一个功能,反正难度不大。

进一步扩展以及思考

在实际的项目开发中我们会遇到以下场景:

  1. 不同的业务采用同一个线程池,这样如果某个服务阻塞,会影响到整体共用线程池的所有服务,会触发线程池的拒绝策略;
  2. 流量突然增加,需要动态调整线程池的参数,这个时候又不能重启;

针对这两种场景,我们对线程池再次进行了深入的思考:

  1. 如何合理配置线程池参数;
  2. 如何动态调整线程池参数;
  3. 如何给不同的服务之间做线程池的隔离;
如何合理配置线程池参数

关于这个问题面试的时候也是经常被问到,我只能说这个问题开始就是一个坑,针对与CPU密集型和I/O密集型,线程池的参数是有不同设计的,也不是遵守几个公式就可以搞定,当然可以参考,我认为对于线程池合理的参数的配置是经过多次调整得到的,甚至增加和减少业务都会影响一些参数,我不太建议大家每天背书式的CPU密集型就是N+1,非CPU密集型就是2N,因此我们更希望看到线程池动态配置。

如何动态调整线程池参数

关于如何动态调整线程池,还是回到我们场景问题的解决上,对于流量突增核心就是提升线程池的处理速度,那如何提升线程池的处理速度,有两种方式,一种是加快业务的处理,也就是消费的快,显然这种在运行的业务中我们想改变还是比较困难,这个可以作为复盘的重点;还有一种就是增加消费者,增加消费者的重点就是调整核心线程数以及非核心线程数的数量。

img

居于这种思考,这个时候我们需要看下ThreadPoolExecutor线程池源码,首先看下开始定义的变量,通过变量的设计我们就会发现大师就是大师,大师通过AtomicInteger修饰的ctl变量,高3位存储了线程池的状态,低29存储线程的个数,通过一个变量完成两件事情,完成状态判断以及限制线程最大个数。使用一个HashSet存储Worker的引用,而Worker继承了AbstractQueuedSynchronizer,实现一个一个不可冲入的独占锁保证线程的安全性。

img

//用来标记线程池状态(高3位),线程个数(低29位)
privatefinalAtomicIntegerctl=newAtomicInteger(ctlOf(RUNNING,0));
//工作状态存储在高3位中
privatestaticfinalintCOUNT_BITS=Integer.SIZE-3;
//线程个数所能表达的最大数值
privatestaticfinalintCAPACITY=(1<<COUNT_BITS)-1;
//线程池状态
//RUNNING-1能够接收新任务,也可以处理阻塞队列中的任务
privatestaticfinalintRUNNING=-1<<COUNT_BITS;
//SHUTDOWN0不可以接受新任务,继续处理阻塞队列中的任务
privatestaticfinalintSHUTDOWN=0<<COUNT_BITS;
//STOP1不接收新任务,不处理阻塞队列中的任务,并且会中断正在处理的任务
privatestaticfinalintSTOP=1<<COUNT_BITS;
//TIDYING2所有任务已经中止,且工作线程数量为0,最后变迁到这个状态的线程将要执行terminated()钩子方法,只会有一个线程执行这个方法;
privatestaticfinalintTIDYING=2<<COUNT_BITS;
//TERMINATED3中止状态,已经执行完terminated()钩子方法
privatestaticfinalintTERMINATED=3<<COUNT_BITS;
//任务队列,当线程池中的线程达到核心线程数量时,再提交任务就会直接提交到workQueue
privatefinalBlockingQueue<Runnable>workQueue;
//线程池全局锁,增加worker减少worker时需要持有mainLock,修改线程池运行状态时,也需要
privatefinalReentrantLockmainLock=newReentrantLock();
//线程池中真正存放worker的地方。
privatefinalHashSet<Worker>workers=newHashSet<Worker>();
privatefinalConditiontermination=mainLock.newCondition();
//记录线程池生命周期内线程数最大值
privateintlargestPoolSize;
//记录线程池所完成任务总数
privatelongcompletedTaskCount;
//创建线程会使用线程工厂
privatevolatileThreadFactorythreadFactory;
//拒绝策略
privatevolatileRejectedExecutionHandlerhandler;
//存活时间
privatevolatilelongkeepAliveTime;
//控制核心线程数量内的线程是否可以被回收。true可以,false不可以。
privatevolatilebooleanallowCoreThreadTimeOut;
//核心线程池数量
privatevolatileintcorePoolSize;
//线程池最大数量
privatevolatileintmaximumPoolSize;

我们的重点看的是volatile修饰的corePoolSize、maximumPoolSize以及keepAliveTime,当然threadFactory和handler也可以看下,不过这两个不是我们解决动态调整线程池的关键。对于这些volatile修饰的关键的变量,从并发角度思考的,必然是有并发读写的操作才使用volatile修饰的,在指标采集中我们看到其get的方法,对于写的操作我们可以猜测肯定提供了set的方式,这个时候我们可以搜索下setCorePoolSize,果不其然我们真的搜索到了。

publicvoidsetCorePoolSize(intcorePoolSize){
if(corePoolSize<0)
thrownewIllegalArgumentException();
intdelta=corePoolSize-this.corePoolSize;
this.corePoolSize=corePoolSize;
//新设置的corePoolSize小于当前核心线程数的时候
//会调用interruptIdleWorkers方法来中断空闲的工作线程
if(workerCountOf(ctl.get())>corePoolSize)
interruptIdleWorkers();
elseif(delta>0){
//当设置的值大于当前值的时候核心线程数的时候
//按照等待队列中的任务数量来创建新的工作线程
intk=Math.min(delta,workQueue.size());
while(k-->0&&addWorker(null,true)){
if(workQueue.isEmpty())
break;
}
}
}

接下来我们看下interruptIdleWorkers的源码,此处源码使用ReentrantLock可重入锁,因为Worker的是通过一个全局的HashSer存储,这里通过ReentrantLock保证线程安全。

privatevoidinterruptIdleWorkers(booleanonlyOne){
//可重入锁
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try{
for(Workerw:workers){
Threadt=w.thread;
if(!t.isInterrupted()&&w.tryLock()){
try{
//中断当前线程
t.interrupt();
}catch(SecurityExceptionignore){
}finally{
w.unlock();
}
}
if(onlyOne)
break;
}
}finally{
mainLock.unlock();
}
}

接下来我们在验证一下是否存在其他相关的参数设置,如下:

publicvoidsetMaximumPoolSize(intmaximumPoolSize){
if(maximumPoolSize<=0||maximumPoolSize<corePoolSize)
thrownewIllegalArgumentException();
this.maximumPoolSize=maximumPoolSize;
if(workerCountOf(ctl.get())>maximumPoolSize)
interruptIdleWorkers();
}
publicvoidsetKeepAliveTime(longtime,TimeUnitunit){
if(time<0)
thrownewIllegalArgumentException();
if(time==0&&allowsCoreThreadTimeOut())
thrownewIllegalArgumentException("Corethreadsmusthavenonzerokeepalivetimes");
longkeepAliveTime=unit.toNanos(time);
longdelta=keepAliveTime-this.keepAliveTime;
this.keepAliveTime=keepAliveTime;
if(delta<0)
interruptIdleWorkers();
}
publicvoidsetRejectedExecutionHandler(RejectedExecutionHandlerhandler){
if(handler==null)
thrownewNullPointerException();
this.handler=handler;
}

这里我们会发现一个问题BlockingQueue的队列容量不能修改,看到美团的文章提供的一个可修改的队列ResizableCapacityLinkedBlockingQueue,于是乎去看了一下LinkedBlockingQueue的源码,发现了关于capacity是一个final修饰的,这个时候我就思考一番,这个地方采用volatile修饰,对外暴露可修改,这样就实现了动态修改阻塞队列的大小。

img
如何给不同的服务之间做线程池的隔离
img

关于如何给不同服务之间做线程池的隔离,这里我们可以采用Hystrix的舱壁模式,也就是说针对不同服务类型的服务单独创建线程池,这样就可以实现服务之间不相互影响,不会因为某个服务导致整体的服务影响都阻塞。

实现方案

聊了这么多前置的知识储备,接下来我们来聊聊实现方案,整体的实现方案我们建立在Spring Boot的基础实现,采用Spring Cloud刷新动态配置,采用该方式比较合适单体应用,对于有Appllo和Nacos可以通过监听配置方式的来动态刷新。

  1. Maven依赖如下;

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>

</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

  1. 配置信息如下:

monitor.threadpool.executors[0].thread-pool-name=first-monitor-thread-pool
monitor.threadpool.executors[0].core-pool-size=4
monitor.threadpool.executors[0].max-pool-size=8
monitor.threadpool.executors[0].queue-capacity=100

monitor.threadpool.executors[1].thread-pool-name=second-monitor-thread-pool
monitor.threadpool.executors[1].core-pool-size=2
monitor.threadpool.executors[1].max-pool-size=4
monitor.threadpool.executors[1].queue-capacity=40

/**
*线程池配置
*
*@authorwangtongzhou
*@since2022-03-1121:41
*/
@Data
publicclassThreadPoolProperties{

/**
*线程池名称
*/
privateStringthreadPoolName;

/**
*核心线程数
*/
privateIntegercorePoolSize=Runtime.getRuntime().availableProcessors();

/**
*最大线程数
*/
privateIntegermaxPoolSize;

/**
*队列最大数量
*/
privateIntegerqueueCapacity;

/**
*拒绝策略
*/
privateStringrejectedExecutionType="AbortPolicy";

/**
*空闲线程存活时间
*/
privateLongkeepAliveTime=1L;

/**
*空闲线程存活时间单位
*/
privateTimeUnitunit=TimeUnit.MILLISECONDS;


}


/**
*动态刷新线程池配置
*
*@authorwangtongzhou
*@since2022-03-1314:09
*/
@ConfigurationProperties(prefix="monitor.threadpool")
@Data
@Component
publicclassDynamicThreadPoolProperties{

privateList<ThreadPoolProperties>executors;
}

  1. 自定可修改阻塞队列大小的方式如下:

/**
*可重新设定队列大小的阻塞队列
*
*@authorwangtongzhou
*@since2022-03-1311:54
*/
publicclassResizableCapacityLinkedBlockingQueue<E>extendsAbstractQueue<E>
implementsBlockingDeque<E>,java.io.Serializable{
/*
*Implementedasasimpledoubly-linkedlistprotectedbya
*singlelockandusingconditionstomanageblocking.
*
*Toimplementweaklyconsistentiterators,itappearsweneedto
*keepallNodesGC-reachablefromapredecessordequeuedNode.
*Thatwouldcausetwoproblems:
*-allowarogueIteratortocauseunboundedmemoryretention
*-causecross-generationallinkingofoldNodestonewNodesif
*aNodewastenuredwhilelive,whichgenerationalGCshavea
*hardtimedealingwith,causingrepeatedmajorcollections.
*However,onlynon-deletedNodesneedtobereachablefrom
*dequeuedNodes,andreachabilitydoesnotnecessarilyhaveto
*beofthekindunderstoodbytheGC.Weusethetrickof
*linkingaNodethathasjustbeendequeuedtoitself.Sucha
*self-linkimplicitlymeanstojumpto"first"(fornextlinks)
*or"last"(forprevlinks).
*/

/*
*Wehave"diamond"multipleinterface/abstractclassinheritance
*here,andthatintroducesambiguities.Oftenwewantthe
*BlockingDequejavadoccombinedwiththeAbstractQueue
*implementation,soalotofmethodspecsareduplicatedhere.
*/

privatestaticfinallongserialVersionUID=-387911632671998426L;

/**
*Doubly-linkedlistnodeclass
*/
staticfinalclassNode<E>{
/**
*Theitem,ornullifthisnodehasbeenremoved.
*/
Eitem;

/**
*Oneof:
*-therealpredecessorNode
*-thisNode,meaningthepredecessoristail
*-null,meaningthereisnopredecessor
*/
Node<E>prev;

/**
*Oneof:
*-therealsuccessorNode
*-thisNode,meaningthesuccessorishead
*-null,meaningthereisnosuccessor
*/
Node<E>next;

Node(Ex){
item=x;
}
}

/**
*Pointertofirstnode.
*Invariant:(first==null&&last==null)||
*(first.prev==null&&first.item!=null)
*/
transientNode<E>first;

/**
*Pointertolastnode.
*Invariant:(first==null&&last==null)||
*(last.next==null&&last.item!=null)
*/
transientNode<E>last;

/**
*Numberofitemsinthedeque
*/
privatetransientintcount;

/**
*Maximumnumberofitemsinthedeque
*/
privatevolatileintcapacity;

publicintgetCapacity(){
returncapacity;
}

publicvoidsetCapacity(intcapacity){
this.capacity=capacity;
}

/**
*Mainlockguardingallaccess
*/
finalReentrantLocklock=newReentrantLock();

/**
*Conditionforwaitingtakes
*/
privatefinalConditionnotEmpty=lock.newCondition();

/**
*Conditionforwaitingputs
*/
privatefinalConditionnotFull=lock.newCondition();

/**
*Createsa{@codeResizableCapacityLinkedBlockIngQueue}withacapacityof
*{@linkInteger#MAX_VALUE}.
*/
publicResizableCapacityLinkedBlockingQueue(){
this(Integer.MAX_VALUE);
}

/**
*Createsa{@codeResizableCapacityLinkedBlockIngQueue}withthegiven(fixed)capacity.
*
*@paramcapacitythecapacityofthisdeque
*@throwsIllegalArgumentExceptionif{@codecapacity}islessthan1
*/
publicResizableCapacityLinkedBlockingQueue(intcapacity){
if(capacity<=0){
thrownewIllegalArgumentException();
}
this.capacity=capacity;
}

/**
*Createsa{@codeResizableCapacityLinkedBlockIngQueue}withacapacityof
*{@linkInteger#MAX_VALUE},initiallycontainingtheelementsof
*thegivencollection,addedintraversalorderofthe
*collection'siterator.
*
*@paramcthecollectionofelementstoinitiallycontain
*@throwsNullPointerExceptionifthespecifiedcollectionorany
*ofitselementsarenull
*/
publicResizableCapacityLinkedBlockingQueue(Collection<?extendsE>c){
this(Integer.MAX_VALUE);
finalReentrantLocklock=this.lock;
lock.lock();//Nevercontended,butnecessaryforvisibility
try{
for(Ee:c){
if(e==null){
thrownewNullPointerException();
}
if(!linkLast(newNode<E>(e))){
thrownewIllegalStateException("Dequefull");
}
}
}finally{
lock.unlock();
}
}


//Basiclinkingandunlinkingoperations,calledonlywhileholdinglock

/**
*Linksnodeasfirstelement,orreturnsfalseiffull.
*/
privatebooleanlinkFirst(Node<E>node){
//assertlock.isHeldByCurrentThread();
if(count>=capacity){
returnfalse;
}
Node<E>f=first;
node.next=f;
first=node;
if(last==null){
last=node;
}else{
f.prev=node;
}
++count;
notEmpty.signal();
returntrue;
}

/**
*Linksnodeaslastelement,orreturnsfalseiffull.
*/
privatebooleanlinkLast(Node<E>node){
//assertlock.isHeldByCurrentThread();
if(count>=capacity){
returnfalse;
}
Node<E>l=last;
node.prev=l;
last=node;
if(first==null){
first=node;
}else{
l.next=node;
}
++count;
notEmpty.signal();
returntrue;
}

/**
*Removesandreturnsfirstelement,ornullifempty.
*/
privateEunlinkFirst(){
//assertlock.isHeldByCurrentThread();
Node<E>f=first;
if(f==null){
returnnull;
}
Node<E>n=f.next;
Eitem=f.item;
f.item=null;
f.next=f;//helpGC
first=n;
if(n==null){
last=null;
}else{
n.prev=null;
}
--count;
notFull.signal();
returnitem;
}

/**
*Removesandreturnslastelement,ornullifempty.
*/
privateEunlinkLast(){
//assertlock.isHeldByCurrentThread();
Node<E>l=last;
if(l==null){
returnnull;
}
Node<E>p=l.prev;
Eitem=l.item;
l.item=null;
l.prev=l;//helpGC
last=p;
if(p==null){
first=null;
}else{
p.next=null;
}
--count;
notFull.signal();
returnitem;
}

/**
*Unlinksx.
*/
voidunlink(Node<E>x){
//assertlock.isHeldByCurrentThread();
Node<E>p=x.prev;
Node<E>n=x.next;
if(p==null){
unlinkFirst();
}elseif(n==null){
unlinkLast();
}else{
p.next=n;
n.prev=p;
x.item=null;
//Don'tmesswithx'slinks.Theymaystillbeinuseby
//aniterator.
--count;
notFull.signal();
}
}

//BlockingDequemethods

/**
*@throwsIllegalStateExceptionifthisdequeisfull
*@throwsNullPointerException{@inheritDoc}
*/
@Override
publicvoidaddFirst(Ee){
if(!offerFirst(e)){
thrownewIllegalStateException("Dequefull");
}
}

/**
*@throwsIllegalStateExceptionifthisdequeisfull
*@throwsNullPointerException{@inheritDoc}
*/
@Override
publicvoidaddLast(Ee){
if(!offerLast(e)){
thrownewIllegalStateException("Dequefull");
}
}

/**
*@throwsNullPointerException{@inheritDoc}
*/
@Override
publicbooleanofferFirst(Ee){
if(e==null){
thrownewNullPointerException();
}
Node<E>node=newNode<E>(e);
finalReentrantLocklock=this.lock;
lock.lock();
try{
returnlinkFirst(node);
}finally{
lock.unlock();
}
}

/**
*@throwsNullPointerException{@inheritDoc}
*/
@Override
publicbooleanofferLast(Ee){
if(e==null)thrownewNullPointerException();
Node<E>node=newNode<E>(e);
finalReentrantLocklock=this.lock;
lock.lock();
try{
returnlinkLast(node);
}finally{
lock.unlock();
}
}

/**
*@throwsNullPointerException{@inheritDoc}
*@throwsInterruptedException{@inheritDoc}
*/
@Override
publicvoidputFirst(Ee)throwsInterruptedException{
if(e==null){
thrownewNullPointerException();
}
Node<E>node=newNode<E>(e);
finalReentrantLocklock=this.lock;
lock.lock();
try{
while(!linkFirst(node)){
notFull.await();
}
}finally{
lock.unlock();
}
}

/**
*@throwsNullPointerException{@inheritDoc}
*@throwsInterruptedException{@inheritDoc}
*/
@Override
publicvoidputLast(Ee)throwsInterruptedException{
if(e==null){
thrownewNullPointerException();
}
Node<E>node=newNode<E>(e);
finalReentrantLocklock=this.lock;
lock.lock();
try{
while(!linkLast(node)){
notFull.await();
}
}finally{
lock.unlock();
}
}

/**
*@throwsNullPointerException{@inheritDoc}
*@throwsInterruptedException{@inheritDoc}
*/
@Override
publicbooleanofferFirst(Ee,longtimeout,TimeUnitunit)
throwsInterruptedException{
if(e==null){
thrownewNullPointerException();
}
Node<E>node=newNode<E>(e);
longnanos=unit.toNanos(timeout);
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
try{
while(!linkFirst(node)){
if(nanos<=0){
returnfalse;
}
nanos=notFull.awaitNanos(nanos);
}
returntrue;
}finally{
lock.unlock();
}
}

/**
*@throwsNullPointerException{@inheritDoc}
*@throwsInterruptedException{@inheritDoc}
*/
@Override
publicbooleanofferLast(Ee,longtimeout,TimeUnitunit)
throwsInterruptedException{
if(e==null)thrownewNullPointerException();
Node<E>node=newNode<E>(e);
longnanos=unit.toNanos(timeout);
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
try{
while(!linkLast(node)){
if(nanos<=0){
returnfalse;
}
nanos=notFull.awaitNanos(nanos);
}
returntrue;
}finally{
lock.unlock();
}
}

/**
*@throwsNoSuchElementException{@inheritDoc}
*/
@Override
publicEremoveFirst(){
Ex=pollFirst();
if(x==null){
thrownewNoSuchElementException();
}
returnx;
}

/**
*@throwsNoSuchElementException{@inheritDoc}
*/
@Override
publicEremoveLast(){
Ex=pollLast();
if(x==null){
thrownewNoSuchElementException();
}
returnx;
}

@Override
publicEpollFirst(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
returnunlinkFirst();
}finally{
lock.unlock();
}
}

@Override
publicEpollLast(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
returnunlinkLast();
}finally{
lock.unlock();
}
}

@Override
publicEtakeFirst()throwsInterruptedException{
finalReentrantLocklock=this.lock;
lock.lock();
try{
Ex;
while((x=unlinkFirst())==null){
notEmpty.await();
}
returnx;
}finally{
lock.unlock();
}
}

@Override
publicEtakeLast()throwsInterruptedException{
finalReentrantLocklock=this.lock;
lock.lock();
try{
Ex;
while((x=unlinkLast())==null){
notEmpty.await();
}
returnx;
}finally{
lock.unlock();
}
}

@Override
publicEpollFirst(longtimeout,TimeUnitunit)
throwsInterruptedException{
longnanos=unit.toNanos(timeout);
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
try{
Ex;
while((x=unlinkFirst())==null){
if(nanos<=0){
returnnull;
}
nanos=notEmpty.awaitNanos(nanos);
}
returnx;
}finally{
lock.unlock();
}
}

@Override
publicEpollLast(longtimeout,TimeUnitunit)
throwsInterruptedException{
longnanos=unit.toNanos(timeout);
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
try{
Ex;
while((x=unlinkLast())==null){
if(nanos<=0){
returnnull;
}
nanos=notEmpty.awaitNanos(nanos);
}
returnx;
}finally{
lock.unlock();
}
}

/**
*@throwsNoSuchElementException{@inheritDoc}
*/
@Override
publicEgetFirst(){
Ex=peekFirst();
if(x==null){
thrownewNoSuchElementException();
}
returnx;
}

/**
*@throwsNoSuchElementException{@inheritDoc}
*/
@Override
publicEgetLast(){
Ex=peekLast();
if(x==null){
thrownewNoSuchElementException();
}
returnx;
}

@Override
publicEpeekFirst(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
return(first==null)?null:first.item;
}finally{
lock.unlock();
}
}

@Override
publicEpeekLast(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
return(last==null)?null:last.item;
}finally{
lock.unlock();
}
}

@Override
publicbooleanremoveFirstOccurrence(Objecto){
if(o==null){
returnfalse;
}
finalReentrantLocklock=this.lock;
lock.lock();
try{
for(Node<E>p=first;p!=null;p=p.next){
if(o.equals(p.item)){
unlink(p);
returntrue;
}
}
returnfalse;
}finally{
lock.unlock();
}
}

@Override
publicbooleanremoveLastOccurrence(Objecto){
if(o==null){
returnfalse;
}
finalReentrantLocklock=this.lock;
lock.lock();
try{
for(Node<E>p=last;p!=null;p=p.prev){
if(o.equals(p.item)){
unlink(p);
returntrue;
}
}
returnfalse;
}finally{
lock.unlock();
}
}

//BlockingQueuemethods

/**
*Insertsthespecifiedelementattheendofthisdequeunlessitwould
*violatecapacityrestrictions.Whenusingacapacity-restricteddeque,
*itisgenerallypreferabletousemethod{@link#offer(Object)offer}.
*
*<p>Thismethodisequivalentto{@link#addLast}.
*
*@throwsIllegalStateExceptionifthisdequeisfull
*@throwsNullPointerExceptionifthespecifiedelementisnull
*/
@Override
publicbooleanadd(Ee){
addLast(e);
returntrue;
}

/**
*@throwsNullPointerExceptionifthespecifiedelementisnull
*/
@Override
publicbooleanoffer(Ee){
returnofferLast(e);
}

/**
*@throwsNullPointerException{@inheritDoc}
*@throwsInterruptedException{@inheritDoc}
*/
@Override
publicvoidput(Ee)throwsInterruptedException{
putLast(e);
}

/**
*@throwsNullPointerException{@inheritDoc}
*@throwsInterruptedException{@inheritDoc}
*/
@Override
publicbooleanoffer(Ee,longtimeout,TimeUnitunit)
throwsInterruptedException{
returnofferLast(e,timeout,unit);
}

/**
*Retrievesandremovestheheadofthequeuerepresentedbythisdeque.
*Thismethoddiffersfrom{@link#pollpoll}onlyinthatitthrowsan
*exceptionifthisdequeisempty.
*
*<p>Thismethodisequivalentto{@link#removeFirst()removeFirst}.
*
*@returntheheadofthequeuerepresentedbythisdeque
*@throwsNoSuchElementExceptionifthisdequeisempty
*/
@Override
publicEremove(){
returnremoveFirst();
}

@Override
publicEpoll(){
returnpollFirst();
}

@Override
publicEtake()throwsInterruptedException{
returntakeFirst();
}

@Override
publicEpoll(longtimeout,TimeUnitunit)throwsInterruptedException{
returnpollFirst(timeout,unit);
}

/**
*Retrieves,butdoesnotremove,theheadofthequeuerepresentedby
*thisdeque.Thismethoddiffersfrom{@link#peekpeek}onlyinthat
*itthrowsanexceptionifthisdequeisempty.
*
*<p>Thismethodisequivalentto{@link#getFirst()getFirst}.
*
*@returntheheadofthequeuerepresentedbythisdeque
*@throwsNoSuchElementExceptionifthisdequeisempty
*/
@Override
publicEelement(){
returngetFirst();
}

@Override
publicEpeek(){
returnpeekFirst();
}

/**
*Returnsthenumberofadditionalelementsthatthisdequecanideally
*(intheabsenceofmemoryorresourceconstraints)acceptwithout
*blocking.Thisisalwaysequaltotheinitialcapacityofthisdeque
*lessthecurrent{@codesize}ofthisdeque.
*
*<p>Notethatyou<em>cannot</em>alwaystellifanattempttoinsert
*anelementwillsucceedbyinspecting{@coderemainingCapacity}
*becauseitmaybethecasethatanotherthreadisaboutto
*insertorremoveanelement.
*/
@Override
publicintremainingCapacity(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
returncapacity-count;
}finally{
lock.unlock();
}
}

/**
*@throwsUnsupportedOperationException{@inheritDoc}
*@throwsClassCastException{@inheritDoc}
*@throwsNullPointerException{@inheritDoc}
*@throwsIllegalArgumentException{@inheritDoc}
*/
@Override
publicintdrainTo(Collection<?superE>c){
returndrainTo(c,Integer.MAX_VALUE);
}

/**
*@throwsUnsupportedOperationException{@inheritDoc}
*@throwsClassCastException{@inheritDoc}
*@throwsNullPointerException{@inheritDoc}
*@throwsIllegalArgumentException{@inheritDoc}
*/
@Override
publicintdrainTo(Collection<?superE>c,intmaxElements){
if(c==null){
thrownewNullPointerException();
}
if(c==this){
thrownewIllegalArgumentException();
}
if(maxElements<=0){
return0;
}
finalReentrantLocklock=this.lock;
lock.lock();
try{
intn=Math.min(maxElements,count);
for(inti=0;i<n;i++){
c.add(first.item);//Inthisorder,incaseadd()throws.
unlinkFirst();
}
returnn;
}finally{
lock.unlock();
}
}

//Stackmethods

/**
*@throwsIllegalStateExceptionifthisdequeisfull
*@throwsNullPointerException{@inheritDoc}
*/
@Override
publicvoidpush(Ee){
addFirst(e);
}

/**
*@throwsNoSuchElementException{@inheritDoc}
*/
@Override
publicEpop(){
returnremoveFirst();
}

//Collectionmethods

/**
*Removesthefirstoccurrenceofthespecifiedelementfromthisdeque.
*Ifthedequedoesnotcontaintheelement,itisunchanged.
*Moreformally,removesthefirstelement{@codee}suchthat
*{@codeo.equals(e)}(ifsuchanelementexists).
*Returns{@codetrue}ifthisdequecontainedthespecifiedelement
*(orequivalently,ifthisdequechangedasaresultofthecall).
*
*<p>Thismethodisequivalentto
*{@link#removeFirstOccurrence(Object)removeFirstOccurrence}.
*
*@paramoelementtoberemovedfromthisdeque,ifpresent
*@return{@codetrue}ifthisdequechangedasaresultofthecall
*/
@Override
publicbooleanremove(Objecto){
returnremoveFirstOccurrence(o);
}

/**
*Returnsthenumberofelementsinthisdeque.
*
*@returnthenumberofelementsinthisdeque
*/
@Override
publicintsize(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
returncount;
}finally{
lock.unlock();
}
}

/**
*Returns{@codetrue}ifthisdequecontainsthespecifiedelement.
*Moreformally,returns{@codetrue}ifandonlyifthisdequecontains
*atleastoneelement{@codee}suchthat{@codeo.equals(e)}.
*
*@paramoobjecttobecheckedforcontainmentinthisdeque
*@return{@codetrue}ifthisdequecontainsthespecifiedelement
*/
@Override
publicbooleancontains(Objecto){
if(o==null){
returnfalse;
}
finalReentrantLocklock=this.lock;
lock.lock();
try{
for(Node<E>p=first;p!=null;p=p.next){
if(o.equals(p.item)){
returntrue;
}
}
returnfalse;
}finally{
lock.unlock();
}
}

/*
*TODO:Addsupportformoreefficientbulkoperations.
*
*Wedon'twanttoacquirethelockforeveryiteration,butwe
*alsowantotherthreadsachancetointeractwiththe
*collection,especiallywhencountisclosetocapacity.
*/

///**
//*Addsalloftheelementsinthespecifiedcollectiontothis
//*queue.AttemptstoaddAllofaqueuetoitselfresultin
//*{@codeIllegalArgumentException}.Further,thebehaviorof
//*thisoperationisundefinedifthespecifiedcollectionis
//*modifiedwhiletheoperationisinprogress.
//*
//*@paramccollectioncontainingelementstobeaddedtothisqueue
//*@return{@codetrue}ifthisqueuechangedasaresultofthecall
//*@throwsClassCastException{@inheritDoc}
//*@throwsNullPointerException{@inheritDoc}
//*@throwsIllegalArgumentException{@inheritDoc}
//*@throwsIllegalStateExceptionifthisdequeisfull
//*@see#add(Object)
//*/
//publicbooleanaddAll(Collection<?extendsE>c){
//if(c==null)
//thrownewNullPointerException();
//if(c==this)
//thrownewIllegalArgumentException();
//finalReentrantLocklock=this.lock;
//lock.lock();
//try{
//booleanmodified=false;
//for(Ee:c)
//if(linkLast(e))
//modified=true;
//returnmodified;
//}finally{
//lock.unlock();
//}
//}

/**
*Returnsanarraycontainingalloftheelementsinthisdeque,in
*propersequence(fromfirsttolastelement).
*
*<p>Thereturnedarraywillbe"safe"inthatnoreferencestoitare
*maintainedbythisdeque.(Inotherwords,thismethodmustallocate
*anewarray).Thecalleristhusfreetomodifythereturnedarray.
*
*<p>Thismethodactsasbridgebetweenarray-basedandcollection-based
*APIs.
*
*@returnanarraycontainingalloftheelementsinthisdeque
*/
@Override
@SuppressWarnings("unchecked")
publicObject[]toArray(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
Object[]a=newObject[count];
intk=0;
for(Node<E>p=first;p!=null;p=p.next){
a[k++]=p.item;
}
returna;
}finally{
lock.unlock();
}
}

/**
*Returnsanarraycontainingalloftheelementsinthisdeque,in
*propersequence;theruntimetypeofthereturnedarrayisthatof
*thespecifiedarray.Ifthedequefitsinthespecifiedarray,it
*isreturnedtherein.Otherwise,anewarrayisallocatedwiththe
*runtimetypeofthespecifiedarrayandthesizeofthisdeque.
*
*<p>Ifthisdequefitsinthespecifiedarraywithroomtospare
*(i.e.,thearrayhasmoreelementsthanthisdeque),theelementin
*thearrayimmediatelyfollowingtheendofthedequeissetto
*{@codenull}.
*
*<p>Likethe{@link#toArray()}method,thismethodactsasbridgebetween
*array-basedandcollection-basedAPIs.Further,thismethodallows
*precisecontrolovertheruntimetypeoftheoutputarray,andmay,
*undercertaincircumstances,beusedtosaveallocationcosts.
*
*<p>Suppose{@codex}isadequeknowntocontainonlystrings.
*Thefollowingcodecanbeusedtodumpthedequeintoanewly
*allocatedarrayof{@codeString}:
*
*<pre>{@codeString[]y=x.toArray(newString[0]);}</pre>
*<p>
*Notethat{@codetoArray(newObject[0])}isidenticalinfunctionto
*{@codetoArray()}.
*
*@paramathearrayintowhichtheelementsofthedequeareto
*bestored,ifitisbigenough;otherwise,anewarrayofthe
*sameruntimetypeisallocatedforthispurpose
*@returnanarraycontainingalloftheelementsinthisdeque
*@throwsArrayStoreExceptioniftheruntimetypeofthespecifiedarray
*isnotasupertypeoftheruntimetypeofeveryelementin
*thisdeque
*@throwsNullPointerExceptionifthespecifiedarrayisnull
*/
@Override
@SuppressWarnings("unchecked")
public<T>T[]toArray(T[]a){
finalReentrantLocklock=this.lock;
lock.lock();
try{
if(a.length<count){
a=(T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(),count);
}
intk=0;
for(Node<E>p=first;p!=null;p=p.next){
a[k++]=(T)p.item;
}
if(a.length>k){
a[k]=null;
}
returna;
}finally{
lock.unlock();
}
}

@Override
publicStringtoString(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
Node<E>p=first;
if(p==null){
return"[]";
}
StringBuildersb=newStringBuilder();
sb.append('[');
for(;;){
Ee=p.item;
sb.append(e==this?"(thisCollection)":e);
p=p.next;
if(p==null){
returnsb.append(']').toString();
}
sb.append(',').append('');
}
}finally{
lock.unlock();
}
}

/**
*Atomicallyremovesalloftheelementsfromthisdeque.
*Thedequewillbeemptyafterthiscallreturns.
*/
@Override
publicvoidclear(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
for(Node<E>f=first;f!=null;){
f.item=null;
Node<E>n=f.next;
f.prev=null;
f.next=null;
f=n;
}
first=last=null;
count=0;
notFull.signalAll();
}finally{
lock.unlock();
}
}

/**
*Returnsaniteratorovertheelementsinthisdequeinpropersequence.
*Theelementswillbereturnedinorderfromfirst(head)tolast(tail).
*
*<p>Thereturnediteratoris
*<ahref="package-summary.html#Weakly"><i>weaklyconsistent</i></a>.
*
*@returnaniteratorovertheelementsinthisdequeinpropersequence
*/
@Override
publicIterator<E>iterator(){
returnnewItr();
}

/**
*Returnsaniteratorovertheelementsinthisdequeinreverse
*sequentialorder.Theelementswillbereturnedinorderfrom
*last(tail)tofirst(head).
*
*<p>Thereturnediteratoris
*<ahref="package-summary.html#Weakly"><i>weaklyconsistent</i></a>.
*
*@returnaniteratorovertheelementsinthisdequeinreverseorder
*/
@Override
publicIterator<E>descendingIterator(){
returnnewDescendingItr();
}

/**
*BaseclassforIteratorsforResizableCapacityLinkedBlockIngQueue
*/
privateabstractclassAbstractItrimplementsIterator<E>{
/**
*Thenextnodetoreturninnext()
*/
Node<E>next;

/**
*nextItemholdsontoitemfieldsbecauseonceweclaimthat
*anelementexistsinhasNext(),wemustreturnitemread
*underlock(inadvance())evenifitwasintheprocessof
*beingremovedwhenhasNext()wascalled.
*/
EnextItem;

/**
*Nodereturnedbymostrecentcalltonext.Neededbyremove.
*Resettonullifthiselementisdeletedbyacalltoremove.
*/
privateNode<E>lastRet;

abstractNode<E>firstNode();

abstractNode<E>nextNode(Node<E>n);

AbstractItr(){
//settoinitialposition
finalReentrantLocklock=ResizableCapacityLinkedBlockingQueue.this.lock;
lock.lock();
try{
next=firstNode();
nextItem=(next==null)?null:next.item;
}finally{
lock.unlock();
}
}

/**
*Returnsthesuccessornodeofthegivennon-null,but
*possiblypreviouslydeleted,node.
*/
privateNode<E>succ(Node<E>n){
//Chainsofdeletednodesendinginnullorself-links
//arepossibleifmultipleinteriornodesareremoved.
for(;;){
Node<E>s=nextNode(n);
if(s==null){
returnnull;
}elseif(s.item!=null){
returns;
}elseif(s==n){
returnfirstNode();
}else{
n=s;
}
}
}

/**
*Advancesnext.
*/
voidadvance(){
finalReentrantLocklock=ResizableCapacityLinkedBlockingQueue.this.lock;
lock.lock();
try{
//assertnext!=null;
next=succ(next);
nextItem=(next==null)?null:next.item;
}finally{
lock.unlock();
}
}

@Override
publicbooleanhasNext(){
returnnext!=null;
}

@Override
publicEnext(){
if(next==null){
thrownewNoSuchElementException();
}
lastRet=next;
Ex=nextItem;
advance();
returnx;
}

@Override
publicvoidremove(){
Node<E>n=lastRet;
if(n==null){
thrownewIllegalStateException();
}
lastRet=null;
finalReentrantLocklock=ResizableCapacityLinkedBlockingQueue.this.lock;
lock.lock();
try{
if(n.item!=null){
unlink(n);
}
}finally{
lock.unlock();
}
}
}

/**
*Forwarditerator
*/
privateclassItrextendsAbstractItr{
@Override
Node<E>firstNode(){
returnfirst;
}

@Override
Node<E>nextNode(Node<E>n){
returnn.next;
}
}

/**
*Descendingiterator
*/
privateclassDescendingItrextendsAbstractItr{
@Override
Node<E>firstNode(){
returnlast;
}

@Override
Node<E>nextNode(Node<E>n){
returnn.prev;
}
}

/**
*AcustomizedvariantofSpliterators.IteratorSpliterator
*/
staticfinalclassLBDSpliterator<E>implementsSpliterator<E>{
staticfinalintMAX_BATCH=1<<25;//maxbatcharraysize;
finalResizableCapacityLinkedBlockingQueue<E>queue;
Node<E>current;//currentnode;nulluntilinitialized
intbatch;//batchsizeforsplits
booleanexhausted;//truewhennomorenodes
longest;//sizeestimate

LBDSpliterator(ResizableCapacityLinkedBlockingQueue<E>queue){
this.queue=queue;
this.est=queue.size();
}

@Override
publiclongestimateSize(){
returnest;
}

@Override
publicSpliterator<E>trySplit(){
Node<E>h;
finalResizableCapacityLinkedBlockingQueue<E>q=this.queue;
intb=batch;
intn=(b<=0)?1:(b>=MAX_BATCH)?MAX_BATCH:b+1;
if(!exhausted&&
((h=current)!=null||(h=q.first)!=null)&&
h.next!=null){
Object[]a=newObject[n];
finalReentrantLocklock=q.lock;
inti=0;
Node<E>p=current;
lock.lock();
try{
if(p!=null||(p=q.first)!=null){
do{
if((a[i]=p.item)!=null){
++i;
}
}while((p=p.next)!=null&&i<n);
}
}finally{
lock.unlock();
}
if((current=p)==null){
est=0L;
exhausted=true;
}elseif((est-=i)<0L){
est=0L;
}
if(i>0){
batch=i;
returnSpliterators.spliterator
(a,0,i,Spliterator.ORDERED|Spliterator.NONNULL|
Spliterator.CONCURRENT);
}
}
returnnull;
}

@Override
publicvoidforEachRemaining(Consumer<?superE>action){
if(action==null){
thrownewNullPointerException();
}
finalResizableCapacityLinkedBlockingQueue<E>q=this.queue;
finalReentrantLocklock=q.lock;
if(!exhausted){
exhausted=true;
Node<E>p=current;
do{
Ee=null;
lock.lock();
try{
if(p==null){
p=q.first;
}
while(p!=null){
e=p.item;
p=p.next;
if(e!=null){
break;
}
}
}finally{
lock.unlock();
}
if(e!=null){
action.accept(e);
}
}while(p!=null);
}
}

@Override
publicbooleantryAdvance(Consumer<?superE>action){
if(action==null){
thrownewNullPointerException();
}
finalResizableCapacityLinkedBlockingQueue<E>q=this.queue;
finalReentrantLocklock=q.lock;
if(!exhausted){
Ee=null;
lock.lock();
try{
if(current==null){
current=q.first;
}
while(current!=null){
e=current.item;
current=current.next;
if(e!=null){
break;
}
}
}finally{
lock.unlock();
}
if(current==null){
exhausted=true;
}
if(e!=null){
action.accept(e);
returntrue;
}
}
returnfalse;
}

@Override
publicintcharacteristics(){
returnSpliterator.ORDERED|Spliterator.NONNULL|
Spliterator.CONCURRENT;
}
}

/**
*Returnsa{@linkSpliterator}overtheelementsinthisdeque.
*
*<p>Thereturnedspliteratoris
*<ahref="package-summary.html#Weakly"><i>weaklyconsistent</i></a>.
*
*<p>The{@codeSpliterator}reports{@linkSpliterator#CONCURRENT},
*{@linkSpliterator#ORDERED},and{@linkSpliterator#NONNULL}.
*
*@returna{@codeSpliterator}overtheelementsinthisdeque
*@implNoteThe{@codeSpliterator}implements{@codetrySplit}topermitlimited
*parallelism.
*@since1.8
*/
@Override
publicSpliterator<E>spliterator(){
returnnewLBDSpliterator<E>(this);
}

/**
*Savesthisdequetoastream(thatis,serializesit).
*
*@paramsthestream
*@throwsjava.io.IOExceptionifanI/Oerroroccurs
*@serialDataThecapacity(int),followedbyelements(eachan
*{@codeObject})intheproperorder,followedbyanull
*/
privatevoidwriteObject(java.io.ObjectOutputStreams)
throwsjava.io.IOException{
finalReentrantLocklock=this.lock;
lock.lock();
try{
//Writeoutcapacityandanyhiddenstuff
s.defaultWriteObject();
//Writeoutallelementsintheproperorder.
for(Node<E>p=first;p!=null;p=p.next){
s.writeObject(p.item);
}
//Usetrailingnullassentinel
s.writeObject(null);
}finally{
lock.unlock();
}
}

/**
*Reconstitutesthisdequefromastream(thatis,deserializesit).
*
*@paramsthestream
*@throwsClassNotFoundExceptioniftheclassofaserializedobject
*couldnotbefound
*@throwsjava.io.IOExceptionifanI/Oerroroccurs
*/
privatevoidreadObject(java.io.ObjectInputStreams)
throwsjava.io.IOException,ClassNotFoundException{
s.defaultReadObject();
count=0;
first=null;
last=null;
//Readinallelementsandplaceinqueue
for(;;){
@SuppressWarnings("unchecked")
Eitem=(E)s.readObject();
if(item==null){
break;
}
add(item);
}
}
}

  1. 自定义线程池,增加每个线程处理的耗时,以及平均耗时、最大耗时、最小耗时,以及输出监控日志信息等等;

/**
*线程池监控类
*
*@authorwangtongzhou
*@since2022-02-2307:27
*/
publicclassThreadPoolMonitorextendsThreadPoolExecutor{

privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(ThreadPoolMonitor.class);

/**
*默认拒绝策略
*/
privatestaticfinalRejectedExecutionHandlerdefaultHandler=newAbortPolicy();

/**
*线程池名称,一般以业务名称命名,方便区分
*/
privateStringpoolName;

/**
*最短执行时间
*/
privateLongminCostTime;

/**
*最长执行时间
*/
privateLongmaxCostTime;
/**
*总的耗时
*/
privateAtomicLongtotalCostTime=newAtomicLong();

privateThreadLocal<Long>startTimeThreadLocal=newThreadLocal<>();

/**
*调用父类的构造方法,并初始化HashMap和线程池名称
*
*@paramcorePoolSize线程池核心线程数
*@parammaximumPoolSize线程池最大线程数
*@paramkeepAliveTime线程的最大空闲时间
*@paramunit空闲时间的单位
*@paramworkQueue保存被提交任务的队列
*@parampoolName线程池名称
*/
publicThreadPoolMonitor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,
TimeUnitunit,BlockingQueue<Runnable>workQueue,StringpoolName){
this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,
Executors.defaultThreadFactory(),poolName);
}


/**
*调用父类的构造方法,并初始化HashMap和线程池名称
*
*@paramcorePoolSize线程池核心线程数
*@parammaximumPoolSize线程池最大线程数
*@paramkeepAliveTime线程的最大空闲时间
*@paramunit空闲时间的单位
*@paramworkQueue保存被提交任务的队列
*@param
*@parampoolName线程池名称
*/
publicThreadPoolMonitor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,
TimeUnitunit,BlockingQueue<Runnable>workQueue,RejectedExecutionHandlerhandler,StringpoolName){
this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,
Executors.defaultThreadFactory(),handler,poolName);
}


/**
*调用父类的构造方法,并初始化HashMap和线程池名称
*
*@paramcorePoolSize线程池核心线程数
*@parammaximumPoolSize线程池最大线程数
*@paramkeepAliveTime线程的最大空闲时间
*@paramunit空闲时间的单位
*@paramworkQueue保存被提交任务的队列
*@paramthreadFactory线程工厂
*@parampoolName线程池名称
*/
publicThreadPoolMonitor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,
TimeUnitunit,BlockingQueue<Runnable>workQueue,
ThreadFactorythreadFactory,StringpoolName){
super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,defaultHandler);
this.poolName=poolName;
}


/**
*调用父类的构造方法,并初始化HashMap和线程池名称
*
*@paramcorePoolSize线程池核心线程数
*@parammaximumPoolSize线程池最大线程数
*@paramkeepAliveTime线程的最大空闲时间
*@paramunit空闲时间的单位
*@paramworkQueue保存被提交任务的队列
*@paramthreadFactory线程工厂
*@paramhandler拒绝策略
*@parampoolName线程池名称
*/
publicThreadPoolMonitor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,
TimeUnitunit,BlockingQueue<Runnable>workQueue,
ThreadFactorythreadFactory,RejectedExecutionHandlerhandler,StringpoolName){
super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
this.poolName=poolName;
}


/**
*线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
*/
@Override
publicvoidshutdown(){
//统计已执行任务、正在执行任务、未执行任务数量
LOGGER.info("{}关闭线程池,已执行任务:{},正在执行任务:{},未执行任务数量:{}",
this.poolName,this.getCompletedTaskCount(),this.getActiveCount(),this.getQueue().size());
super.shutdown();
}

/**
*线程池立即关闭时,统计线程池情况
*/
@Override
publicList<Runnable>shutdownNow(){
//统计已执行任务、正在执行任务、未执行任务数量
LOGGER.info("{}立即关闭线程池,已执行任务:{},正在执行任务:{},未执行任务数量:{}",
this.poolName,this.getCompletedTaskCount(),this.getActiveCount(),this.getQueue().size());
returnsuper.shutdownNow();
}

/**
*任务执行之前,记录任务开始时间
*/
@Override
protectedvoidbeforeExecute(Threadt,Runnabler){
startTimeThreadLocal.set(System.currentTimeMillis());
}

/**
*任务执行之后,计算任务结束时间
*/
@Override
protectedvoidafterExecute(Runnabler,Throwablet){
longcostTime=System.currentTimeMillis()-startTimeThreadLocal.get();
startTimeThreadLocal.remove();
maxCostTime=maxCostTime>costTime?maxCostTime:costTime;
if(getCompletedTaskCount()==0){
minCostTime=costTime;
}
minCostTime=minCostTime<costTime?minCostTime:costTime;
totalCostTime.addAndGet(costTime);
LOGGER.info("{}-pool-monitor:"+
"任务耗时:{}ms,初始线程数:{},核心线程数:{},执行的任务数量:{},"+
"已完成任务数量:{},任务总数:{},队列里缓存的任务数量:{},池中存在的最大线程数:{},"+
"最大允许的线程数:{},线程空闲时间:{},线程池是否关闭:{},线程池是否终止:{}",
this.poolName,
costTime,this.getPoolSize(),this.getCorePoolSize(),this.getActiveCount(),
this.getCompletedTaskCount(),this.getTaskCount(),this.getQueue().size(),this.getLargestPoolSize(),
this.getMaximumPoolSize(),this.getKeepAliveTime(TimeUnit.MILLISECONDS),this.isShutdown(),this.isTerminated());
}


publicLonggetMinCostTime(){
returnminCostTime;
}

publicLonggetMaxCostTime(){
returnmaxCostTime;
}

publiclonggetAverageCostTime(){
if(getCompletedTaskCount()==0||totalCostTime.get()==0){
return0;
}
returntotalCostTime.get()/getCompletedTaskCount();
}

/**
*生成线程池所用的线程,改写了线程池默认的线程工厂,传入线程池名称,便于问题追踪
*/
staticclassMonitorThreadFactoryimplementsThreadFactory{
privatestaticfinalAtomicIntegerpoolNumber=newAtomicInteger(1);
privatefinalThreadGroupgroup;
privatefinalAtomicIntegerthreadNumber=newAtomicInteger(1);
privatefinalStringnamePrefix;

/**
*初始化线程工厂
*
*@parampoolName线程池名称
*/
MonitorThreadFactory(StringpoolName){
SecurityManagers=System.getSecurityManager();
group=Objects.nonNull(s)?s.getThreadGroup():Thread.currentThread().getThreadGroup();
namePrefix=poolName+"-pool-"+poolNumber.getAndIncrement()+"-thread-";
}

@Override
publicThreadnewThread(Runnabler){
Threadt=newThread(group,r,namePrefix+threadNumber.getAndIncrement(),0);
if(t.isDaemon()){
t.setDaemon(false);
}
if(t.getPriority()!=Thread.NORM_PRIORITY){
t.setPriority(Thread.NORM_PRIORITY);
}
returnt;
}
}
}

  1. 动态修改线程池的类,通过Spring的监听器监控配置刷新方法,实现动态更新线程池的参数;

/**
*动态刷新线程池
*
*@authorwangtongzhou
*@since2022-03-1314:13
*/
@Component
@Slf4j
publicclassDynamicThreadPoolManager{


@Autowired
privateDynamicThreadPoolPropertiesdynamicThreadPoolProperties;

/**
*存储线程池对象
*/
publicMap<String,ThreadPoolMonitor>threadPoolExecutorMap=newHashMap<>();


publicMap<String,ThreadPoolMonitor>getThreadPoolExecutorMap(){
returnthreadPoolExecutorMap;
}


/**
*初始化线程池
*/
@PostConstruct
publicvoidinit(){
createThreadPools(dynamicThreadPoolProperties);
}

/**
*初始化线程池的创建
*
*@paramdynamicThreadPoolProperties
*/
privatevoidcreateThreadPools(DynamicThreadPoolPropertiesdynamicThreadPoolProperties){
dynamicThreadPoolProperties.getExecutors().forEach(config->{
if(!threadPoolExecutorMap.containsKey(config.getThreadPoolName())){
ThreadPoolMonitorthreadPoolMonitor=newThreadPoolMonitor(
config.getCorePoolSize(),
config.getMaxPoolSize(),
config.getKeepAliveTime(),
config.getUnit(),
newResizableCapacityLinkedBlockingQueue<>(config.getQueueCapacity()),
RejectedExecutionHandlerEnum.getRejectedExecutionHandler(config.getRejectedExecutionType()),
config.getThreadPoolName()
);
threadPoolExecutorMap.put(config.getThreadPoolName(),
threadPoolMonitor);
}

});
}

/**
*调整线程池
*
*@paramdynamicThreadPoolProperties
*/
privatevoidchangeThreadPools(DynamicThreadPoolPropertiesdynamicThreadPoolProperties){
dynamicThreadPoolProperties.getExecutors().forEach(config->{
ThreadPoolExecutorthreadPoolExecutor=threadPoolExecutorMap.get(config.getThreadPoolName());
if(Objects.nonNull(threadPoolExecutor)){
threadPoolExecutor.setCorePoolSize(config.getCorePoolSize());
threadPoolExecutor.setMaximumPoolSize(config.getMaxPoolSize());
threadPoolExecutor.setKeepAliveTime(config.getKeepAliveTime(),config.getUnit());
threadPoolExecutor.setRejectedExecutionHandler(RejectedExecutionHandlerEnum.getRejectedExecutionHandler(config.getRejectedExecutionType()));
BlockingQueue<Runnable>queue=threadPoolExecutor.getQueue();
if(queueinstanceofResizableCapacityLinkedBlockingQueue){
((ResizableCapacityLinkedBlockingQueue<Runnable>)queue).setCapacity(config.getQueueCapacity());
}
}
});
}


@EventListener
publicvoidenvListener(EnvironmentChangeEventevent){
log.info("配置发生变更"+event);
changeThreadPools(dynamicThreadPoolProperties);
}

}

  1. DynamicThreadPoolPropertiesController对外暴露两个方法,第一个通过ContextRefresher提供对外刷新配置的接口,实现及时更新配置信息,第二提供一个查询接口的方法,

/**
*动态修改线程池参数
*
*@authorwangtongzhou
*@since2022-03-1317:27
*/
@RestController
publicclassDynamicThreadPoolPropertiesController{

@Autowired
privateContextRefreshercontextRefresher;


@Autowired
privateDynamicThreadPoolPropertiesdynamicThreadPoolProperties;


@Autowired
privateDynamicThreadPoolManagerdynamicThreadPoolManager;


@PostMapping("/threadPool/properties")
publicvoidupdate(){
ThreadPoolPropertiesthreadPoolProperties=
dynamicThreadPoolProperties.getExecutors().get(0);
threadPoolProperties.setCorePoolSize(20);
threadPoolProperties.setMaxPoolSize(50);
threadPoolProperties.setQueueCapacity(200);
threadPoolProperties.setRejectedExecutionType("CallerRunsPolicy");
contextRefresher.refresh();
}

@GetMapping("/threadPool/properties")
publicMap<String,Object>queryThreadPoolProperties(){
Map<String,Object>metricMap=newHashMap<>();
List<Map>threadPools=newArrayList<>();
dynamicThreadPoolManager.getThreadPoolExecutorMap().forEach((k,v)->{
ThreadPoolMonitorthreadPoolMonitor=(ThreadPoolMonitor)v;
Map<String,Object>poolInfo=newHashMap<>();
poolInfo.put("thread.pool.name",k);
poolInfo.put("thread.pool.core.size",threadPoolMonitor.getCorePoolSize());
poolInfo.put("thread.pool.largest.size",threadPoolMonitor.getLargestPoolSize());
poolInfo.put("thread.pool.max.size",threadPoolMonitor.getMaximumPoolSize());
poolInfo.put("thread.pool.thread.count",threadPoolMonitor.getPoolSize());
poolInfo.put("thread.pool.max.costTime",threadPoolMonitor.getMaxCostTime());
poolInfo.put("thread.pool.average.costTime",threadPoolMonitor.getAverageCostTime());
poolInfo.put("thread.pool.min.costTime",threadPoolMonitor.getMinCostTime());
poolInfo.put("thread.pool.active.count",threadPoolMonitor.getActiveCount());
poolInfo.put("thread.pool.completed.taskCount",threadPoolMonitor.getCompletedTaskCount());
poolInfo.put("thread.pool.queue.name",threadPoolMonitor.getQueue().getClass().getName());
poolInfo.put("thread.pool.rejected.name",threadPoolMonitor.getRejectedExecutionHandler().getClass().getName());
poolInfo.put("thread.pool.task.count",threadPoolMonitor.getTaskCount());
threadPools.add(poolInfo);
});
metricMap.put("threadPools",threadPools);
returnmetricMap;
}

}

整体上的流程到这里就完成了,算是一个Demo版,对于该组件更深入的思考我认为还可以做以下三件事情:

  1. 应该以starter的形式嵌入到应用,通过判断启动类加载的Appllo、Nacos还是默认实现;
  2. 对外可以Push、也可以是日志,还可以支持各种库,提供丰富的输出形式,这个样子的话更加通用化;
  3. 提供统一查询接口、修改接口、增加权限校验、增加预警规则配置;

参考以下内容:

美团文章

结束

欢迎大家点点关注,点点赞!

本文共计10760个文字,预计阅读时间需要44分钟。

前言:这篇是推动大家异步编程思想的核心线程池的预备篇,要做好监控,让大家使用无忧,敬畏生产。

为什么需要对线程池进行监控?Java线程池作为最常使用的并发工具,为什么如此信任它?

前言

这篇是推动大家异步编程的思想的线程池的准备篇,要做好监控,让大家使用无后顾之忧,敬畏生产。

为什么需要对线程池进行监控

Java线程池作为最常使用到的并发工具,相信大家都不陌生,但是你真的确定使用对了吗?大名鼎鼎的阿里Java代码规范要求我们不使用 Executors来快速创建线程池,但是抛弃Executors,使用其它方式创建线程池就一定不会出现问题吗?本质上对于我们来说线程池本身的运行过程是一个黑盒,我们没办法了解线程池中的运行状态时,出现问题没有办法及时判断和预警。面对这种黑盒操作必须通过监控方式让其透明化,这样对我们来说才能更好的使用好线程池。因此必须对线程池做监控。

image.png
如何做线程池的监控

对于如何做监控,本质就是涉及三点,分别是数据采集、数据存储以及大盘的展示,接下来我们分说下这三点;

数据采集

采集什么数据,对于我们来说需要采集就是黑盒的数据,什么又是线程池的黑盒数据,其实也就是整个线程处理的整个流程,在整个流程中,我们可以通过ThreadPoolExecutor中的七个方法获取数据,通过这七个方法采集到的数据就可以使线程池的执行过程透明化。

  1. getCorePoolSize():获取核心线程数;
  2. getMaximumPoolSize:获取最大线程数;
  3. getQueue():获取线程池中的阻塞队列,并通过阻塞队列中的方法获取队列长度、元素个数等;
  4. getPoolSize():获取线程池中的工作线程数(包括核心线程和非核心线程);
  5. getActiveCount():获取活跃线程数,也就是正在执行任务的线程;
  6. getLargestPoolSize():获取线程池曾经到过的最大工作线程数;
  7. getTaskCount():获取历史已完成以及正在执行的总的任务数量;

除了我们了解的这些流程以外,ThreadPoolExecutor中还提供了三种钩子函数,

  1. beforeExecute():Worker线程执行任务之前会调用的方法;
  2. afterExecute():在Worker线程执行任务之后会调用的方法;
  3. terminated():当线程池从运行状态变更到TERMINATED状态之前调用的方法;

对于beforeExecute和afterExecute可以理解为使用Aop监听线程执行的时间,这样子我们可以对每个线程运行的时间整体做监控,terminated可以理解为线程关闭时候的监控,这样我们就可以整体获取采集到线程池生命周期的所有数据了。

数据存储以及大盘的展示

对于存储我们这个比较适合采用时序性数据库,此外现在很多成熟的监控产品都可以满足我们大屏展示的诉求,这里推荐下美团Cat和Prometheus,这里不展开进行讲解,大家可以根据自己公司的监控产品进行选择,对于不同的方案采取的存储形式会有些差异,甚至自己都可以自定义实现一个功能,反正难度不大。

进一步扩展以及思考

在实际的项目开发中我们会遇到以下场景:

  1. 不同的业务采用同一个线程池,这样如果某个服务阻塞,会影响到整体共用线程池的所有服务,会触发线程池的拒绝策略;
  2. 流量突然增加,需要动态调整线程池的参数,这个时候又不能重启;

针对这两种场景,我们对线程池再次进行了深入的思考:

  1. 如何合理配置线程池参数;
  2. 如何动态调整线程池参数;
  3. 如何给不同的服务之间做线程池的隔离;
如何合理配置线程池参数

关于这个问题面试的时候也是经常被问到,我只能说这个问题开始就是一个坑,针对与CPU密集型和I/O密集型,线程池的参数是有不同设计的,也不是遵守几个公式就可以搞定,当然可以参考,我认为对于线程池合理的参数的配置是经过多次调整得到的,甚至增加和减少业务都会影响一些参数,我不太建议大家每天背书式的CPU密集型就是N+1,非CPU密集型就是2N,因此我们更希望看到线程池动态配置。

如何动态调整线程池参数

关于如何动态调整线程池,还是回到我们场景问题的解决上,对于流量突增核心就是提升线程池的处理速度,那如何提升线程池的处理速度,有两种方式,一种是加快业务的处理,也就是消费的快,显然这种在运行的业务中我们想改变还是比较困难,这个可以作为复盘的重点;还有一种就是增加消费者,增加消费者的重点就是调整核心线程数以及非核心线程数的数量。

img

居于这种思考,这个时候我们需要看下ThreadPoolExecutor线程池源码,首先看下开始定义的变量,通过变量的设计我们就会发现大师就是大师,大师通过AtomicInteger修饰的ctl变量,高3位存储了线程池的状态,低29存储线程的个数,通过一个变量完成两件事情,完成状态判断以及限制线程最大个数。使用一个HashSet存储Worker的引用,而Worker继承了AbstractQueuedSynchronizer,实现一个一个不可冲入的独占锁保证线程的安全性。

img

//用来标记线程池状态(高3位),线程个数(低29位)
privatefinalAtomicIntegerctl=newAtomicInteger(ctlOf(RUNNING,0));
//工作状态存储在高3位中
privatestaticfinalintCOUNT_BITS=Integer.SIZE-3;
//线程个数所能表达的最大数值
privatestaticfinalintCAPACITY=(1<<COUNT_BITS)-1;
//线程池状态
//RUNNING-1能够接收新任务,也可以处理阻塞队列中的任务
privatestaticfinalintRUNNING=-1<<COUNT_BITS;
//SHUTDOWN0不可以接受新任务,继续处理阻塞队列中的任务
privatestaticfinalintSHUTDOWN=0<<COUNT_BITS;
//STOP1不接收新任务,不处理阻塞队列中的任务,并且会中断正在处理的任务
privatestaticfinalintSTOP=1<<COUNT_BITS;
//TIDYING2所有任务已经中止,且工作线程数量为0,最后变迁到这个状态的线程将要执行terminated()钩子方法,只会有一个线程执行这个方法;
privatestaticfinalintTIDYING=2<<COUNT_BITS;
//TERMINATED3中止状态,已经执行完terminated()钩子方法
privatestaticfinalintTERMINATED=3<<COUNT_BITS;
//任务队列,当线程池中的线程达到核心线程数量时,再提交任务就会直接提交到workQueue
privatefinalBlockingQueue<Runnable>workQueue;
//线程池全局锁,增加worker减少worker时需要持有mainLock,修改线程池运行状态时,也需要
privatefinalReentrantLockmainLock=newReentrantLock();
//线程池中真正存放worker的地方。
privatefinalHashSet<Worker>workers=newHashSet<Worker>();
privatefinalConditiontermination=mainLock.newCondition();
//记录线程池生命周期内线程数最大值
privateintlargestPoolSize;
//记录线程池所完成任务总数
privatelongcompletedTaskCount;
//创建线程会使用线程工厂
privatevolatileThreadFactorythreadFactory;
//拒绝策略
privatevolatileRejectedExecutionHandlerhandler;
//存活时间
privatevolatilelongkeepAliveTime;
//控制核心线程数量内的线程是否可以被回收。true可以,false不可以。
privatevolatilebooleanallowCoreThreadTimeOut;
//核心线程池数量
privatevolatileintcorePoolSize;
//线程池最大数量
privatevolatileintmaximumPoolSize;

我们的重点看的是volatile修饰的corePoolSize、maximumPoolSize以及keepAliveTime,当然threadFactory和handler也可以看下,不过这两个不是我们解决动态调整线程池的关键。对于这些volatile修饰的关键的变量,从并发角度思考的,必然是有并发读写的操作才使用volatile修饰的,在指标采集中我们看到其get的方法,对于写的操作我们可以猜测肯定提供了set的方式,这个时候我们可以搜索下setCorePoolSize,果不其然我们真的搜索到了。

publicvoidsetCorePoolSize(intcorePoolSize){
if(corePoolSize<0)
thrownewIllegalArgumentException();
intdelta=corePoolSize-this.corePoolSize;
this.corePoolSize=corePoolSize;
//新设置的corePoolSize小于当前核心线程数的时候
//会调用interruptIdleWorkers方法来中断空闲的工作线程
if(workerCountOf(ctl.get())>corePoolSize)
interruptIdleWorkers();
elseif(delta>0){
//当设置的值大于当前值的时候核心线程数的时候
//按照等待队列中的任务数量来创建新的工作线程
intk=Math.min(delta,workQueue.size());
while(k-->0&&addWorker(null,true)){
if(workQueue.isEmpty())
break;
}
}
}

接下来我们看下interruptIdleWorkers的源码,此处源码使用ReentrantLock可重入锁,因为Worker的是通过一个全局的HashSer存储,这里通过ReentrantLock保证线程安全。

privatevoidinterruptIdleWorkers(booleanonlyOne){
//可重入锁
finalReentrantLockmainLock=this.mainLock;
mainLock.lock();
try{
for(Workerw:workers){
Threadt=w.thread;
if(!t.isInterrupted()&&w.tryLock()){
try{
//中断当前线程
t.interrupt();
}catch(SecurityExceptionignore){
}finally{
w.unlock();
}
}
if(onlyOne)
break;
}
}finally{
mainLock.unlock();
}
}

接下来我们在验证一下是否存在其他相关的参数设置,如下:

publicvoidsetMaximumPoolSize(intmaximumPoolSize){
if(maximumPoolSize<=0||maximumPoolSize<corePoolSize)
thrownewIllegalArgumentException();
this.maximumPoolSize=maximumPoolSize;
if(workerCountOf(ctl.get())>maximumPoolSize)
interruptIdleWorkers();
}
publicvoidsetKeepAliveTime(longtime,TimeUnitunit){
if(time<0)
thrownewIllegalArgumentException();
if(time==0&&allowsCoreThreadTimeOut())
thrownewIllegalArgumentException("Corethreadsmusthavenonzerokeepalivetimes");
longkeepAliveTime=unit.toNanos(time);
longdelta=keepAliveTime-this.keepAliveTime;
this.keepAliveTime=keepAliveTime;
if(delta<0)
interruptIdleWorkers();
}
publicvoidsetRejectedExecutionHandler(RejectedExecutionHandlerhandler){
if(handler==null)
thrownewNullPointerException();
this.handler=handler;
}

这里我们会发现一个问题BlockingQueue的队列容量不能修改,看到美团的文章提供的一个可修改的队列ResizableCapacityLinkedBlockingQueue,于是乎去看了一下LinkedBlockingQueue的源码,发现了关于capacity是一个final修饰的,这个时候我就思考一番,这个地方采用volatile修饰,对外暴露可修改,这样就实现了动态修改阻塞队列的大小。

img
如何给不同的服务之间做线程池的隔离
img

关于如何给不同服务之间做线程池的隔离,这里我们可以采用Hystrix的舱壁模式,也就是说针对不同服务类型的服务单独创建线程池,这样就可以实现服务之间不相互影响,不会因为某个服务导致整体的服务影响都阻塞。

实现方案

聊了这么多前置的知识储备,接下来我们来聊聊实现方案,整体的实现方案我们建立在Spring Boot的基础实现,采用Spring Cloud刷新动态配置,采用该方式比较合适单体应用,对于有Appllo和Nacos可以通过监听配置方式的来动态刷新。

  1. Maven依赖如下;

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>

</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Hoxton.SR7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

  1. 配置信息如下:

monitor.threadpool.executors[0].thread-pool-name=first-monitor-thread-pool
monitor.threadpool.executors[0].core-pool-size=4
monitor.threadpool.executors[0].max-pool-size=8
monitor.threadpool.executors[0].queue-capacity=100

monitor.threadpool.executors[1].thread-pool-name=second-monitor-thread-pool
monitor.threadpool.executors[1].core-pool-size=2
monitor.threadpool.executors[1].max-pool-size=4
monitor.threadpool.executors[1].queue-capacity=40

/**
*线程池配置
*
*@authorwangtongzhou
*@since2022-03-1121:41
*/
@Data
publicclassThreadPoolProperties{

/**
*线程池名称
*/
privateStringthreadPoolName;

/**
*核心线程数
*/
privateIntegercorePoolSize=Runtime.getRuntime().availableProcessors();

/**
*最大线程数
*/
privateIntegermaxPoolSize;

/**
*队列最大数量
*/
privateIntegerqueueCapacity;

/**
*拒绝策略
*/
privateStringrejectedExecutionType="AbortPolicy";

/**
*空闲线程存活时间
*/
privateLongkeepAliveTime=1L;

/**
*空闲线程存活时间单位
*/
privateTimeUnitunit=TimeUnit.MILLISECONDS;


}


/**
*动态刷新线程池配置
*
*@authorwangtongzhou
*@since2022-03-1314:09
*/
@ConfigurationProperties(prefix="monitor.threadpool")
@Data
@Component
publicclassDynamicThreadPoolProperties{

privateList<ThreadPoolProperties>executors;
}

  1. 自定可修改阻塞队列大小的方式如下:

/**
*可重新设定队列大小的阻塞队列
*
*@authorwangtongzhou
*@since2022-03-1311:54
*/
publicclassResizableCapacityLinkedBlockingQueue<E>extendsAbstractQueue<E>
implementsBlockingDeque<E>,java.io.Serializable{
/*
*Implementedasasimpledoubly-linkedlistprotectedbya
*singlelockandusingconditionstomanageblocking.
*
*Toimplementweaklyconsistentiterators,itappearsweneedto
*keepallNodesGC-reachablefromapredecessordequeuedNode.
*Thatwouldcausetwoproblems:
*-allowarogueIteratortocauseunboundedmemoryretention
*-causecross-generationallinkingofoldNodestonewNodesif
*aNodewastenuredwhilelive,whichgenerationalGCshavea
*hardtimedealingwith,causingrepeatedmajorcollections.
*However,onlynon-deletedNodesneedtobereachablefrom
*dequeuedNodes,andreachabilitydoesnotnecessarilyhaveto
*beofthekindunderstoodbytheGC.Weusethetrickof
*linkingaNodethathasjustbeendequeuedtoitself.Sucha
*self-linkimplicitlymeanstojumpto"first"(fornextlinks)
*or"last"(forprevlinks).
*/

/*
*Wehave"diamond"multipleinterface/abstractclassinheritance
*here,andthatintroducesambiguities.Oftenwewantthe
*BlockingDequejavadoccombinedwiththeAbstractQueue
*implementation,soalotofmethodspecsareduplicatedhere.
*/

privatestaticfinallongserialVersionUID=-387911632671998426L;

/**
*Doubly-linkedlistnodeclass
*/
staticfinalclassNode<E>{
/**
*Theitem,ornullifthisnodehasbeenremoved.
*/
Eitem;

/**
*Oneof:
*-therealpredecessorNode
*-thisNode,meaningthepredecessoristail
*-null,meaningthereisnopredecessor
*/
Node<E>prev;

/**
*Oneof:
*-therealsuccessorNode
*-thisNode,meaningthesuccessorishead
*-null,meaningthereisnosuccessor
*/
Node<E>next;

Node(Ex){
item=x;
}
}

/**
*Pointertofirstnode.
*Invariant:(first==null&&last==null)||
*(first.prev==null&&first.item!=null)
*/
transientNode<E>first;

/**
*Pointertolastnode.
*Invariant:(first==null&&last==null)||
*(last.next==null&&last.item!=null)
*/
transientNode<E>last;

/**
*Numberofitemsinthedeque
*/
privatetransientintcount;

/**
*Maximumnumberofitemsinthedeque
*/
privatevolatileintcapacity;

publicintgetCapacity(){
returncapacity;
}

publicvoidsetCapacity(intcapacity){
this.capacity=capacity;
}

/**
*Mainlockguardingallaccess
*/
finalReentrantLocklock=newReentrantLock();

/**
*Conditionforwaitingtakes
*/
privatefinalConditionnotEmpty=lock.newCondition();

/**
*Conditionforwaitingputs
*/
privatefinalConditionnotFull=lock.newCondition();

/**
*Createsa{@codeResizableCapacityLinkedBlockIngQueue}withacapacityof
*{@linkInteger#MAX_VALUE}.
*/
publicResizableCapacityLinkedBlockingQueue(){
this(Integer.MAX_VALUE);
}

/**
*Createsa{@codeResizableCapacityLinkedBlockIngQueue}withthegiven(fixed)capacity.
*
*@paramcapacitythecapacityofthisdeque
*@throwsIllegalArgumentExceptionif{@codecapacity}islessthan1
*/
publicResizableCapacityLinkedBlockingQueue(intcapacity){
if(capacity<=0){
thrownewIllegalArgumentException();
}
this.capacity=capacity;
}

/**
*Createsa{@codeResizableCapacityLinkedBlockIngQueue}withacapacityof
*{@linkInteger#MAX_VALUE},initiallycontainingtheelementsof
*thegivencollection,addedintraversalorderofthe
*collection'siterator.
*
*@paramcthecollectionofelementstoinitiallycontain
*@throwsNullPointerExceptionifthespecifiedcollectionorany
*ofitselementsarenull
*/
publicResizableCapacityLinkedBlockingQueue(Collection<?extendsE>c){
this(Integer.MAX_VALUE);
finalReentrantLocklock=this.lock;
lock.lock();//Nevercontended,butnecessaryforvisibility
try{
for(Ee:c){
if(e==null){
thrownewNullPointerException();
}
if(!linkLast(newNode<E>(e))){
thrownewIllegalStateException("Dequefull");
}
}
}finally{
lock.unlock();
}
}


//Basiclinkingandunlinkingoperations,calledonlywhileholdinglock

/**
*Linksnodeasfirstelement,orreturnsfalseiffull.
*/
privatebooleanlinkFirst(Node<E>node){
//assertlock.isHeldByCurrentThread();
if(count>=capacity){
returnfalse;
}
Node<E>f=first;
node.next=f;
first=node;
if(last==null){
last=node;
}else{
f.prev=node;
}
++count;
notEmpty.signal();
returntrue;
}

/**
*Linksnodeaslastelement,orreturnsfalseiffull.
*/
privatebooleanlinkLast(Node<E>node){
//assertlock.isHeldByCurrentThread();
if(count>=capacity){
returnfalse;
}
Node<E>l=last;
node.prev=l;
last=node;
if(first==null){
first=node;
}else{
l.next=node;
}
++count;
notEmpty.signal();
returntrue;
}

/**
*Removesandreturnsfirstelement,ornullifempty.
*/
privateEunlinkFirst(){
//assertlock.isHeldByCurrentThread();
Node<E>f=first;
if(f==null){
returnnull;
}
Node<E>n=f.next;
Eitem=f.item;
f.item=null;
f.next=f;//helpGC
first=n;
if(n==null){
last=null;
}else{
n.prev=null;
}
--count;
notFull.signal();
returnitem;
}

/**
*Removesandreturnslastelement,ornullifempty.
*/
privateEunlinkLast(){
//assertlock.isHeldByCurrentThread();
Node<E>l=last;
if(l==null){
returnnull;
}
Node<E>p=l.prev;
Eitem=l.item;
l.item=null;
l.prev=l;//helpGC
last=p;
if(p==null){
first=null;
}else{
p.next=null;
}
--count;
notFull.signal();
returnitem;
}

/**
*Unlinksx.
*/
voidunlink(Node<E>x){
//assertlock.isHeldByCurrentThread();
Node<E>p=x.prev;
Node<E>n=x.next;
if(p==null){
unlinkFirst();
}elseif(n==null){
unlinkLast();
}else{
p.next=n;
n.prev=p;
x.item=null;
//Don'tmesswithx'slinks.Theymaystillbeinuseby
//aniterator.
--count;
notFull.signal();
}
}

//BlockingDequemethods

/**
*@throwsIllegalStateExceptionifthisdequeisfull
*@throwsNullPointerException{@inheritDoc}
*/
@Override
publicvoidaddFirst(Ee){
if(!offerFirst(e)){
thrownewIllegalStateException("Dequefull");
}
}

/**
*@throwsIllegalStateExceptionifthisdequeisfull
*@throwsNullPointerException{@inheritDoc}
*/
@Override
publicvoidaddLast(Ee){
if(!offerLast(e)){
thrownewIllegalStateException("Dequefull");
}
}

/**
*@throwsNullPointerException{@inheritDoc}
*/
@Override
publicbooleanofferFirst(Ee){
if(e==null){
thrownewNullPointerException();
}
Node<E>node=newNode<E>(e);
finalReentrantLocklock=this.lock;
lock.lock();
try{
returnlinkFirst(node);
}finally{
lock.unlock();
}
}

/**
*@throwsNullPointerException{@inheritDoc}
*/
@Override
publicbooleanofferLast(Ee){
if(e==null)thrownewNullPointerException();
Node<E>node=newNode<E>(e);
finalReentrantLocklock=this.lock;
lock.lock();
try{
returnlinkLast(node);
}finally{
lock.unlock();
}
}

/**
*@throwsNullPointerException{@inheritDoc}
*@throwsInterruptedException{@inheritDoc}
*/
@Override
publicvoidputFirst(Ee)throwsInterruptedException{
if(e==null){
thrownewNullPointerException();
}
Node<E>node=newNode<E>(e);
finalReentrantLocklock=this.lock;
lock.lock();
try{
while(!linkFirst(node)){
notFull.await();
}
}finally{
lock.unlock();
}
}

/**
*@throwsNullPointerException{@inheritDoc}
*@throwsInterruptedException{@inheritDoc}
*/
@Override
publicvoidputLast(Ee)throwsInterruptedException{
if(e==null){
thrownewNullPointerException();
}
Node<E>node=newNode<E>(e);
finalReentrantLocklock=this.lock;
lock.lock();
try{
while(!linkLast(node)){
notFull.await();
}
}finally{
lock.unlock();
}
}

/**
*@throwsNullPointerException{@inheritDoc}
*@throwsInterruptedException{@inheritDoc}
*/
@Override
publicbooleanofferFirst(Ee,longtimeout,TimeUnitunit)
throwsInterruptedException{
if(e==null){
thrownewNullPointerException();
}
Node<E>node=newNode<E>(e);
longnanos=unit.toNanos(timeout);
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
try{
while(!linkFirst(node)){
if(nanos<=0){
returnfalse;
}
nanos=notFull.awaitNanos(nanos);
}
returntrue;
}finally{
lock.unlock();
}
}

/**
*@throwsNullPointerException{@inheritDoc}
*@throwsInterruptedException{@inheritDoc}
*/
@Override
publicbooleanofferLast(Ee,longtimeout,TimeUnitunit)
throwsInterruptedException{
if(e==null)thrownewNullPointerException();
Node<E>node=newNode<E>(e);
longnanos=unit.toNanos(timeout);
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
try{
while(!linkLast(node)){
if(nanos<=0){
returnfalse;
}
nanos=notFull.awaitNanos(nanos);
}
returntrue;
}finally{
lock.unlock();
}
}

/**
*@throwsNoSuchElementException{@inheritDoc}
*/
@Override
publicEremoveFirst(){
Ex=pollFirst();
if(x==null){
thrownewNoSuchElementException();
}
returnx;
}

/**
*@throwsNoSuchElementException{@inheritDoc}
*/
@Override
publicEremoveLast(){
Ex=pollLast();
if(x==null){
thrownewNoSuchElementException();
}
returnx;
}

@Override
publicEpollFirst(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
returnunlinkFirst();
}finally{
lock.unlock();
}
}

@Override
publicEpollLast(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
returnunlinkLast();
}finally{
lock.unlock();
}
}

@Override
publicEtakeFirst()throwsInterruptedException{
finalReentrantLocklock=this.lock;
lock.lock();
try{
Ex;
while((x=unlinkFirst())==null){
notEmpty.await();
}
returnx;
}finally{
lock.unlock();
}
}

@Override
publicEtakeLast()throwsInterruptedException{
finalReentrantLocklock=this.lock;
lock.lock();
try{
Ex;
while((x=unlinkLast())==null){
notEmpty.await();
}
returnx;
}finally{
lock.unlock();
}
}

@Override
publicEpollFirst(longtimeout,TimeUnitunit)
throwsInterruptedException{
longnanos=unit.toNanos(timeout);
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
try{
Ex;
while((x=unlinkFirst())==null){
if(nanos<=0){
returnnull;
}
nanos=notEmpty.awaitNanos(nanos);
}
returnx;
}finally{
lock.unlock();
}
}

@Override
publicEpollLast(longtimeout,TimeUnitunit)
throwsInterruptedException{
longnanos=unit.toNanos(timeout);
finalReentrantLocklock=this.lock;
lock.lockInterruptibly();
try{
Ex;
while((x=unlinkLast())==null){
if(nanos<=0){
returnnull;
}
nanos=notEmpty.awaitNanos(nanos);
}
returnx;
}finally{
lock.unlock();
}
}

/**
*@throwsNoSuchElementException{@inheritDoc}
*/
@Override
publicEgetFirst(){
Ex=peekFirst();
if(x==null){
thrownewNoSuchElementException();
}
returnx;
}

/**
*@throwsNoSuchElementException{@inheritDoc}
*/
@Override
publicEgetLast(){
Ex=peekLast();
if(x==null){
thrownewNoSuchElementException();
}
returnx;
}

@Override
publicEpeekFirst(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
return(first==null)?null:first.item;
}finally{
lock.unlock();
}
}

@Override
publicEpeekLast(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
return(last==null)?null:last.item;
}finally{
lock.unlock();
}
}

@Override
publicbooleanremoveFirstOccurrence(Objecto){
if(o==null){
returnfalse;
}
finalReentrantLocklock=this.lock;
lock.lock();
try{
for(Node<E>p=first;p!=null;p=p.next){
if(o.equals(p.item)){
unlink(p);
returntrue;
}
}
returnfalse;
}finally{
lock.unlock();
}
}

@Override
publicbooleanremoveLastOccurrence(Objecto){
if(o==null){
returnfalse;
}
finalReentrantLocklock=this.lock;
lock.lock();
try{
for(Node<E>p=last;p!=null;p=p.prev){
if(o.equals(p.item)){
unlink(p);
returntrue;
}
}
returnfalse;
}finally{
lock.unlock();
}
}

//BlockingQueuemethods

/**
*Insertsthespecifiedelementattheendofthisdequeunlessitwould
*violatecapacityrestrictions.Whenusingacapacity-restricteddeque,
*itisgenerallypreferabletousemethod{@link#offer(Object)offer}.
*
*<p>Thismethodisequivalentto{@link#addLast}.
*
*@throwsIllegalStateExceptionifthisdequeisfull
*@throwsNullPointerExceptionifthespecifiedelementisnull
*/
@Override
publicbooleanadd(Ee){
addLast(e);
returntrue;
}

/**
*@throwsNullPointerExceptionifthespecifiedelementisnull
*/
@Override
publicbooleanoffer(Ee){
returnofferLast(e);
}

/**
*@throwsNullPointerException{@inheritDoc}
*@throwsInterruptedException{@inheritDoc}
*/
@Override
publicvoidput(Ee)throwsInterruptedException{
putLast(e);
}

/**
*@throwsNullPointerException{@inheritDoc}
*@throwsInterruptedException{@inheritDoc}
*/
@Override
publicbooleanoffer(Ee,longtimeout,TimeUnitunit)
throwsInterruptedException{
returnofferLast(e,timeout,unit);
}

/**
*Retrievesandremovestheheadofthequeuerepresentedbythisdeque.
*Thismethoddiffersfrom{@link#pollpoll}onlyinthatitthrowsan
*exceptionifthisdequeisempty.
*
*<p>Thismethodisequivalentto{@link#removeFirst()removeFirst}.
*
*@returntheheadofthequeuerepresentedbythisdeque
*@throwsNoSuchElementExceptionifthisdequeisempty
*/
@Override
publicEremove(){
returnremoveFirst();
}

@Override
publicEpoll(){
returnpollFirst();
}

@Override
publicEtake()throwsInterruptedException{
returntakeFirst();
}

@Override
publicEpoll(longtimeout,TimeUnitunit)throwsInterruptedException{
returnpollFirst(timeout,unit);
}

/**
*Retrieves,butdoesnotremove,theheadofthequeuerepresentedby
*thisdeque.Thismethoddiffersfrom{@link#peekpeek}onlyinthat
*itthrowsanexceptionifthisdequeisempty.
*
*<p>Thismethodisequivalentto{@link#getFirst()getFirst}.
*
*@returntheheadofthequeuerepresentedbythisdeque
*@throwsNoSuchElementExceptionifthisdequeisempty
*/
@Override
publicEelement(){
returngetFirst();
}

@Override
publicEpeek(){
returnpeekFirst();
}

/**
*Returnsthenumberofadditionalelementsthatthisdequecanideally
*(intheabsenceofmemoryorresourceconstraints)acceptwithout
*blocking.Thisisalwaysequaltotheinitialcapacityofthisdeque
*lessthecurrent{@codesize}ofthisdeque.
*
*<p>Notethatyou<em>cannot</em>alwaystellifanattempttoinsert
*anelementwillsucceedbyinspecting{@coderemainingCapacity}
*becauseitmaybethecasethatanotherthreadisaboutto
*insertorremoveanelement.
*/
@Override
publicintremainingCapacity(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
returncapacity-count;
}finally{
lock.unlock();
}
}

/**
*@throwsUnsupportedOperationException{@inheritDoc}
*@throwsClassCastException{@inheritDoc}
*@throwsNullPointerException{@inheritDoc}
*@throwsIllegalArgumentException{@inheritDoc}
*/
@Override
publicintdrainTo(Collection<?superE>c){
returndrainTo(c,Integer.MAX_VALUE);
}

/**
*@throwsUnsupportedOperationException{@inheritDoc}
*@throwsClassCastException{@inheritDoc}
*@throwsNullPointerException{@inheritDoc}
*@throwsIllegalArgumentException{@inheritDoc}
*/
@Override
publicintdrainTo(Collection<?superE>c,intmaxElements){
if(c==null){
thrownewNullPointerException();
}
if(c==this){
thrownewIllegalArgumentException();
}
if(maxElements<=0){
return0;
}
finalReentrantLocklock=this.lock;
lock.lock();
try{
intn=Math.min(maxElements,count);
for(inti=0;i<n;i++){
c.add(first.item);//Inthisorder,incaseadd()throws.
unlinkFirst();
}
returnn;
}finally{
lock.unlock();
}
}

//Stackmethods

/**
*@throwsIllegalStateExceptionifthisdequeisfull
*@throwsNullPointerException{@inheritDoc}
*/
@Override
publicvoidpush(Ee){
addFirst(e);
}

/**
*@throwsNoSuchElementException{@inheritDoc}
*/
@Override
publicEpop(){
returnremoveFirst();
}

//Collectionmethods

/**
*Removesthefirstoccurrenceofthespecifiedelementfromthisdeque.
*Ifthedequedoesnotcontaintheelement,itisunchanged.
*Moreformally,removesthefirstelement{@codee}suchthat
*{@codeo.equals(e)}(ifsuchanelementexists).
*Returns{@codetrue}ifthisdequecontainedthespecifiedelement
*(orequivalently,ifthisdequechangedasaresultofthecall).
*
*<p>Thismethodisequivalentto
*{@link#removeFirstOccurrence(Object)removeFirstOccurrence}.
*
*@paramoelementtoberemovedfromthisdeque,ifpresent
*@return{@codetrue}ifthisdequechangedasaresultofthecall
*/
@Override
publicbooleanremove(Objecto){
returnremoveFirstOccurrence(o);
}

/**
*Returnsthenumberofelementsinthisdeque.
*
*@returnthenumberofelementsinthisdeque
*/
@Override
publicintsize(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
returncount;
}finally{
lock.unlock();
}
}

/**
*Returns{@codetrue}ifthisdequecontainsthespecifiedelement.
*Moreformally,returns{@codetrue}ifandonlyifthisdequecontains
*atleastoneelement{@codee}suchthat{@codeo.equals(e)}.
*
*@paramoobjecttobecheckedforcontainmentinthisdeque
*@return{@codetrue}ifthisdequecontainsthespecifiedelement
*/
@Override
publicbooleancontains(Objecto){
if(o==null){
returnfalse;
}
finalReentrantLocklock=this.lock;
lock.lock();
try{
for(Node<E>p=first;p!=null;p=p.next){
if(o.equals(p.item)){
returntrue;
}
}
returnfalse;
}finally{
lock.unlock();
}
}

/*
*TODO:Addsupportformoreefficientbulkoperations.
*
*Wedon'twanttoacquirethelockforeveryiteration,butwe
*alsowantotherthreadsachancetointeractwiththe
*collection,especiallywhencountisclosetocapacity.
*/

///**
//*Addsalloftheelementsinthespecifiedcollectiontothis
//*queue.AttemptstoaddAllofaqueuetoitselfresultin
//*{@codeIllegalArgumentException}.Further,thebehaviorof
//*thisoperationisundefinedifthespecifiedcollectionis
//*modifiedwhiletheoperationisinprogress.
//*
//*@paramccollectioncontainingelementstobeaddedtothisqueue
//*@return{@codetrue}ifthisqueuechangedasaresultofthecall
//*@throwsClassCastException{@inheritDoc}
//*@throwsNullPointerException{@inheritDoc}
//*@throwsIllegalArgumentException{@inheritDoc}
//*@throwsIllegalStateExceptionifthisdequeisfull
//*@see#add(Object)
//*/
//publicbooleanaddAll(Collection<?extendsE>c){
//if(c==null)
//thrownewNullPointerException();
//if(c==this)
//thrownewIllegalArgumentException();
//finalReentrantLocklock=this.lock;
//lock.lock();
//try{
//booleanmodified=false;
//for(Ee:c)
//if(linkLast(e))
//modified=true;
//returnmodified;
//}finally{
//lock.unlock();
//}
//}

/**
*Returnsanarraycontainingalloftheelementsinthisdeque,in
*propersequence(fromfirsttolastelement).
*
*<p>Thereturnedarraywillbe"safe"inthatnoreferencestoitare
*maintainedbythisdeque.(Inotherwords,thismethodmustallocate
*anewarray).Thecalleristhusfreetomodifythereturnedarray.
*
*<p>Thismethodactsasbridgebetweenarray-basedandcollection-based
*APIs.
*
*@returnanarraycontainingalloftheelementsinthisdeque
*/
@Override
@SuppressWarnings("unchecked")
publicObject[]toArray(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
Object[]a=newObject[count];
intk=0;
for(Node<E>p=first;p!=null;p=p.next){
a[k++]=p.item;
}
returna;
}finally{
lock.unlock();
}
}

/**
*Returnsanarraycontainingalloftheelementsinthisdeque,in
*propersequence;theruntimetypeofthereturnedarrayisthatof
*thespecifiedarray.Ifthedequefitsinthespecifiedarray,it
*isreturnedtherein.Otherwise,anewarrayisallocatedwiththe
*runtimetypeofthespecifiedarrayandthesizeofthisdeque.
*
*<p>Ifthisdequefitsinthespecifiedarraywithroomtospare
*(i.e.,thearrayhasmoreelementsthanthisdeque),theelementin
*thearrayimmediatelyfollowingtheendofthedequeissetto
*{@codenull}.
*
*<p>Likethe{@link#toArray()}method,thismethodactsasbridgebetween
*array-basedandcollection-basedAPIs.Further,thismethodallows
*precisecontrolovertheruntimetypeoftheoutputarray,andmay,
*undercertaincircumstances,beusedtosaveallocationcosts.
*
*<p>Suppose{@codex}isadequeknowntocontainonlystrings.
*Thefollowingcodecanbeusedtodumpthedequeintoanewly
*allocatedarrayof{@codeString}:
*
*<pre>{@codeString[]y=x.toArray(newString[0]);}</pre>
*<p>
*Notethat{@codetoArray(newObject[0])}isidenticalinfunctionto
*{@codetoArray()}.
*
*@paramathearrayintowhichtheelementsofthedequeareto
*bestored,ifitisbigenough;otherwise,anewarrayofthe
*sameruntimetypeisallocatedforthispurpose
*@returnanarraycontainingalloftheelementsinthisdeque
*@throwsArrayStoreExceptioniftheruntimetypeofthespecifiedarray
*isnotasupertypeoftheruntimetypeofeveryelementin
*thisdeque
*@throwsNullPointerExceptionifthespecifiedarrayisnull
*/
@Override
@SuppressWarnings("unchecked")
public<T>T[]toArray(T[]a){
finalReentrantLocklock=this.lock;
lock.lock();
try{
if(a.length<count){
a=(T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(),count);
}
intk=0;
for(Node<E>p=first;p!=null;p=p.next){
a[k++]=(T)p.item;
}
if(a.length>k){
a[k]=null;
}
returna;
}finally{
lock.unlock();
}
}

@Override
publicStringtoString(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
Node<E>p=first;
if(p==null){
return"[]";
}
StringBuildersb=newStringBuilder();
sb.append('[');
for(;;){
Ee=p.item;
sb.append(e==this?"(thisCollection)":e);
p=p.next;
if(p==null){
returnsb.append(']').toString();
}
sb.append(',').append('');
}
}finally{
lock.unlock();
}
}

/**
*Atomicallyremovesalloftheelementsfromthisdeque.
*Thedequewillbeemptyafterthiscallreturns.
*/
@Override
publicvoidclear(){
finalReentrantLocklock=this.lock;
lock.lock();
try{
for(Node<E>f=first;f!=null;){
f.item=null;
Node<E>n=f.next;
f.prev=null;
f.next=null;
f=n;
}
first=last=null;
count=0;
notFull.signalAll();
}finally{
lock.unlock();
}
}

/**
*Returnsaniteratorovertheelementsinthisdequeinpropersequence.
*Theelementswillbereturnedinorderfromfirst(head)tolast(tail).
*
*<p>Thereturnediteratoris
*<ahref="package-summary.html#Weakly"><i>weaklyconsistent</i></a>.
*
*@returnaniteratorovertheelementsinthisdequeinpropersequence
*/
@Override
publicIterator<E>iterator(){
returnnewItr();
}

/**
*Returnsaniteratorovertheelementsinthisdequeinreverse
*sequentialorder.Theelementswillbereturnedinorderfrom
*last(tail)tofirst(head).
*
*<p>Thereturnediteratoris
*<ahref="package-summary.html#Weakly"><i>weaklyconsistent</i></a>.
*
*@returnaniteratorovertheelementsinthisdequeinreverseorder
*/
@Override
publicIterator<E>descendingIterator(){
returnnewDescendingItr();
}

/**
*BaseclassforIteratorsforResizableCapacityLinkedBlockIngQueue
*/
privateabstractclassAbstractItrimplementsIterator<E>{
/**
*Thenextnodetoreturninnext()
*/
Node<E>next;

/**
*nextItemholdsontoitemfieldsbecauseonceweclaimthat
*anelementexistsinhasNext(),wemustreturnitemread
*underlock(inadvance())evenifitwasintheprocessof
*beingremovedwhenhasNext()wascalled.
*/
EnextItem;

/**
*Nodereturnedbymostrecentcalltonext.Neededbyremove.
*Resettonullifthiselementisdeletedbyacalltoremove.
*/
privateNode<E>lastRet;

abstractNode<E>firstNode();

abstractNode<E>nextNode(Node<E>n);

AbstractItr(){
//settoinitialposition
finalReentrantLocklock=ResizableCapacityLinkedBlockingQueue.this.lock;
lock.lock();
try{
next=firstNode();
nextItem=(next==null)?null:next.item;
}finally{
lock.unlock();
}
}

/**
*Returnsthesuccessornodeofthegivennon-null,but
*possiblypreviouslydeleted,node.
*/
privateNode<E>succ(Node<E>n){
//Chainsofdeletednodesendinginnullorself-links
//arepossibleifmultipleinteriornodesareremoved.
for(;;){
Node<E>s=nextNode(n);
if(s==null){
returnnull;
}elseif(s.item!=null){
returns;
}elseif(s==n){
returnfirstNode();
}else{
n=s;
}
}
}

/**
*Advancesnext.
*/
voidadvance(){
finalReentrantLocklock=ResizableCapacityLinkedBlockingQueue.this.lock;
lock.lock();
try{
//assertnext!=null;
next=succ(next);
nextItem=(next==null)?null:next.item;
}finally{
lock.unlock();
}
}

@Override
publicbooleanhasNext(){
returnnext!=null;
}

@Override
publicEnext(){
if(next==null){
thrownewNoSuchElementException();
}
lastRet=next;
Ex=nextItem;
advance();
returnx;
}

@Override
publicvoidremove(){
Node<E>n=lastRet;
if(n==null){
thrownewIllegalStateException();
}
lastRet=null;
finalReentrantLocklock=ResizableCapacityLinkedBlockingQueue.this.lock;
lock.lock();
try{
if(n.item!=null){
unlink(n);
}
}finally{
lock.unlock();
}
}
}

/**
*Forwarditerator
*/
privateclassItrextendsAbstractItr{
@Override
Node<E>firstNode(){
returnfirst;
}

@Override
Node<E>nextNode(Node<E>n){
returnn.next;
}
}

/**
*Descendingiterator
*/
privateclassDescendingItrextendsAbstractItr{
@Override
Node<E>firstNode(){
returnlast;
}

@Override
Node<E>nextNode(Node<E>n){
returnn.prev;
}
}

/**
*AcustomizedvariantofSpliterators.IteratorSpliterator
*/
staticfinalclassLBDSpliterator<E>implementsSpliterator<E>{
staticfinalintMAX_BATCH=1<<25;//maxbatcharraysize;
finalResizableCapacityLinkedBlockingQueue<E>queue;
Node<E>current;//currentnode;nulluntilinitialized
intbatch;//batchsizeforsplits
booleanexhausted;//truewhennomorenodes
longest;//sizeestimate

LBDSpliterator(ResizableCapacityLinkedBlockingQueue<E>queue){
this.queue=queue;
this.est=queue.size();
}

@Override
publiclongestimateSize(){
returnest;
}

@Override
publicSpliterator<E>trySplit(){
Node<E>h;
finalResizableCapacityLinkedBlockingQueue<E>q=this.queue;
intb=batch;
intn=(b<=0)?1:(b>=MAX_BATCH)?MAX_BATCH:b+1;
if(!exhausted&&
((h=current)!=null||(h=q.first)!=null)&&
h.next!=null){
Object[]a=newObject[n];
finalReentrantLocklock=q.lock;
inti=0;
Node<E>p=current;
lock.lock();
try{
if(p!=null||(p=q.first)!=null){
do{
if((a[i]=p.item)!=null){
++i;
}
}while((p=p.next)!=null&&i<n);
}
}finally{
lock.unlock();
}
if((current=p)==null){
est=0L;
exhausted=true;
}elseif((est-=i)<0L){
est=0L;
}
if(i>0){
batch=i;
returnSpliterators.spliterator
(a,0,i,Spliterator.ORDERED|Spliterator.NONNULL|
Spliterator.CONCURRENT);
}
}
returnnull;
}

@Override
publicvoidforEachRemaining(Consumer<?superE>action){
if(action==null){
thrownewNullPointerException();
}
finalResizableCapacityLinkedBlockingQueue<E>q=this.queue;
finalReentrantLocklock=q.lock;
if(!exhausted){
exhausted=true;
Node<E>p=current;
do{
Ee=null;
lock.lock();
try{
if(p==null){
p=q.first;
}
while(p!=null){
e=p.item;
p=p.next;
if(e!=null){
break;
}
}
}finally{
lock.unlock();
}
if(e!=null){
action.accept(e);
}
}while(p!=null);
}
}

@Override
publicbooleantryAdvance(Consumer<?superE>action){
if(action==null){
thrownewNullPointerException();
}
finalResizableCapacityLinkedBlockingQueue<E>q=this.queue;
finalReentrantLocklock=q.lock;
if(!exhausted){
Ee=null;
lock.lock();
try{
if(current==null){
current=q.first;
}
while(current!=null){
e=current.item;
current=current.next;
if(e!=null){
break;
}
}
}finally{
lock.unlock();
}
if(current==null){
exhausted=true;
}
if(e!=null){
action.accept(e);
returntrue;
}
}
returnfalse;
}

@Override
publicintcharacteristics(){
returnSpliterator.ORDERED|Spliterator.NONNULL|
Spliterator.CONCURRENT;
}
}

/**
*Returnsa{@linkSpliterator}overtheelementsinthisdeque.
*
*<p>Thereturnedspliteratoris
*<ahref="package-summary.html#Weakly"><i>weaklyconsistent</i></a>.
*
*<p>The{@codeSpliterator}reports{@linkSpliterator#CONCURRENT},
*{@linkSpliterator#ORDERED},and{@linkSpliterator#NONNULL}.
*
*@returna{@codeSpliterator}overtheelementsinthisdeque
*@implNoteThe{@codeSpliterator}implements{@codetrySplit}topermitlimited
*parallelism.
*@since1.8
*/
@Override
publicSpliterator<E>spliterator(){
returnnewLBDSpliterator<E>(this);
}

/**
*Savesthisdequetoastream(thatis,serializesit).
*
*@paramsthestream
*@throwsjava.io.IOExceptionifanI/Oerroroccurs
*@serialDataThecapacity(int),followedbyelements(eachan
*{@codeObject})intheproperorder,followedbyanull
*/
privatevoidwriteObject(java.io.ObjectOutputStreams)
throwsjava.io.IOException{
finalReentrantLocklock=this.lock;
lock.lock();
try{
//Writeoutcapacityandanyhiddenstuff
s.defaultWriteObject();
//Writeoutallelementsintheproperorder.
for(Node<E>p=first;p!=null;p=p.next){
s.writeObject(p.item);
}
//Usetrailingnullassentinel
s.writeObject(null);
}finally{
lock.unlock();
}
}

/**
*Reconstitutesthisdequefromastream(thatis,deserializesit).
*
*@paramsthestream
*@throwsClassNotFoundExceptioniftheclassofaserializedobject
*couldnotbefound
*@throwsjava.io.IOExceptionifanI/Oerroroccurs
*/
privatevoidreadObject(java.io.ObjectInputStreams)
throwsjava.io.IOException,ClassNotFoundException{
s.defaultReadObject();
count=0;
first=null;
last=null;
//Readinallelementsandplaceinqueue
for(;;){
@SuppressWarnings("unchecked")
Eitem=(E)s.readObject();
if(item==null){
break;
}
add(item);
}
}
}

  1. 自定义线程池,增加每个线程处理的耗时,以及平均耗时、最大耗时、最小耗时,以及输出监控日志信息等等;

/**
*线程池监控类
*
*@authorwangtongzhou
*@since2022-02-2307:27
*/
publicclassThreadPoolMonitorextendsThreadPoolExecutor{

privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(ThreadPoolMonitor.class);

/**
*默认拒绝策略
*/
privatestaticfinalRejectedExecutionHandlerdefaultHandler=newAbortPolicy();

/**
*线程池名称,一般以业务名称命名,方便区分
*/
privateStringpoolName;

/**
*最短执行时间
*/
privateLongminCostTime;

/**
*最长执行时间
*/
privateLongmaxCostTime;
/**
*总的耗时
*/
privateAtomicLongtotalCostTime=newAtomicLong();

privateThreadLocal<Long>startTimeThreadLocal=newThreadLocal<>();

/**
*调用父类的构造方法,并初始化HashMap和线程池名称
*
*@paramcorePoolSize线程池核心线程数
*@parammaximumPoolSize线程池最大线程数
*@paramkeepAliveTime线程的最大空闲时间
*@paramunit空闲时间的单位
*@paramworkQueue保存被提交任务的队列
*@parampoolName线程池名称
*/
publicThreadPoolMonitor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,
TimeUnitunit,BlockingQueue<Runnable>workQueue,StringpoolName){
this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,
Executors.defaultThreadFactory(),poolName);
}


/**
*调用父类的构造方法,并初始化HashMap和线程池名称
*
*@paramcorePoolSize线程池核心线程数
*@parammaximumPoolSize线程池最大线程数
*@paramkeepAliveTime线程的最大空闲时间
*@paramunit空闲时间的单位
*@paramworkQueue保存被提交任务的队列
*@param
*@parampoolName线程池名称
*/
publicThreadPoolMonitor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,
TimeUnitunit,BlockingQueue<Runnable>workQueue,RejectedExecutionHandlerhandler,StringpoolName){
this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,
Executors.defaultThreadFactory(),handler,poolName);
}


/**
*调用父类的构造方法,并初始化HashMap和线程池名称
*
*@paramcorePoolSize线程池核心线程数
*@parammaximumPoolSize线程池最大线程数
*@paramkeepAliveTime线程的最大空闲时间
*@paramunit空闲时间的单位
*@paramworkQueue保存被提交任务的队列
*@paramthreadFactory线程工厂
*@parampoolName线程池名称
*/
publicThreadPoolMonitor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,
TimeUnitunit,BlockingQueue<Runnable>workQueue,
ThreadFactorythreadFactory,StringpoolName){
super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,defaultHandler);
this.poolName=poolName;
}


/**
*调用父类的构造方法,并初始化HashMap和线程池名称
*
*@paramcorePoolSize线程池核心线程数
*@parammaximumPoolSize线程池最大线程数
*@paramkeepAliveTime线程的最大空闲时间
*@paramunit空闲时间的单位
*@paramworkQueue保存被提交任务的队列
*@paramthreadFactory线程工厂
*@paramhandler拒绝策略
*@parampoolName线程池名称
*/
publicThreadPoolMonitor(intcorePoolSize,intmaximumPoolSize,longkeepAliveTime,
TimeUnitunit,BlockingQueue<Runnable>workQueue,
ThreadFactorythreadFactory,RejectedExecutionHandlerhandler,StringpoolName){
super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
this.poolName=poolName;
}


/**
*线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
*/
@Override
publicvoidshutdown(){
//统计已执行任务、正在执行任务、未执行任务数量
LOGGER.info("{}关闭线程池,已执行任务:{},正在执行任务:{},未执行任务数量:{}",
this.poolName,this.getCompletedTaskCount(),this.getActiveCount(),this.getQueue().size());
super.shutdown();
}

/**
*线程池立即关闭时,统计线程池情况
*/
@Override
publicList<Runnable>shutdownNow(){
//统计已执行任务、正在执行任务、未执行任务数量
LOGGER.info("{}立即关闭线程池,已执行任务:{},正在执行任务:{},未执行任务数量:{}",
this.poolName,this.getCompletedTaskCount(),this.getActiveCount(),this.getQueue().size());
returnsuper.shutdownNow();
}

/**
*任务执行之前,记录任务开始时间
*/
@Override
protectedvoidbeforeExecute(Threadt,Runnabler){
startTimeThreadLocal.set(System.currentTimeMillis());
}

/**
*任务执行之后,计算任务结束时间
*/
@Override
protectedvoidafterExecute(Runnabler,Throwablet){
longcostTime=System.currentTimeMillis()-startTimeThreadLocal.get();
startTimeThreadLocal.remove();
maxCostTime=maxCostTime>costTime?maxCostTime:costTime;
if(getCompletedTaskCount()==0){
minCostTime=costTime;
}
minCostTime=minCostTime<costTime?minCostTime:costTime;
totalCostTime.addAndGet(costTime);
LOGGER.info("{}-pool-monitor:"+
"任务耗时:{}ms,初始线程数:{},核心线程数:{},执行的任务数量:{},"+
"已完成任务数量:{},任务总数:{},队列里缓存的任务数量:{},池中存在的最大线程数:{},"+
"最大允许的线程数:{},线程空闲时间:{},线程池是否关闭:{},线程池是否终止:{}",
this.poolName,
costTime,this.getPoolSize(),this.getCorePoolSize(),this.getActiveCount(),
this.getCompletedTaskCount(),this.getTaskCount(),this.getQueue().size(),this.getLargestPoolSize(),
this.getMaximumPoolSize(),this.getKeepAliveTime(TimeUnit.MILLISECONDS),this.isShutdown(),this.isTerminated());
}


publicLonggetMinCostTime(){
returnminCostTime;
}

publicLonggetMaxCostTime(){
returnmaxCostTime;
}

publiclonggetAverageCostTime(){
if(getCompletedTaskCount()==0||totalCostTime.get()==0){
return0;
}
returntotalCostTime.get()/getCompletedTaskCount();
}

/**
*生成线程池所用的线程,改写了线程池默认的线程工厂,传入线程池名称,便于问题追踪
*/
staticclassMonitorThreadFactoryimplementsThreadFactory{
privatestaticfinalAtomicIntegerpoolNumber=newAtomicInteger(1);
privatefinalThreadGroupgroup;
privatefinalAtomicIntegerthreadNumber=newAtomicInteger(1);
privatefinalStringnamePrefix;

/**
*初始化线程工厂
*
*@parampoolName线程池名称
*/
MonitorThreadFactory(StringpoolName){
SecurityManagers=System.getSecurityManager();
group=Objects.nonNull(s)?s.getThreadGroup():Thread.currentThread().getThreadGroup();
namePrefix=poolName+"-pool-"+poolNumber.getAndIncrement()+"-thread-";
}

@Override
publicThreadnewThread(Runnabler){
Threadt=newThread(group,r,namePrefix+threadNumber.getAndIncrement(),0);
if(t.isDaemon()){
t.setDaemon(false);
}
if(t.getPriority()!=Thread.NORM_PRIORITY){
t.setPriority(Thread.NORM_PRIORITY);
}
returnt;
}
}
}

  1. 动态修改线程池的类,通过Spring的监听器监控配置刷新方法,实现动态更新线程池的参数;

/**
*动态刷新线程池
*
*@authorwangtongzhou
*@since2022-03-1314:13
*/
@Component
@Slf4j
publicclassDynamicThreadPoolManager{


@Autowired
privateDynamicThreadPoolPropertiesdynamicThreadPoolProperties;

/**
*存储线程池对象
*/
publicMap<String,ThreadPoolMonitor>threadPoolExecutorMap=newHashMap<>();


publicMap<String,ThreadPoolMonitor>getThreadPoolExecutorMap(){
returnthreadPoolExecutorMap;
}


/**
*初始化线程池
*/
@PostConstruct
publicvoidinit(){
createThreadPools(dynamicThreadPoolProperties);
}

/**
*初始化线程池的创建
*
*@paramdynamicThreadPoolProperties
*/
privatevoidcreateThreadPools(DynamicThreadPoolPropertiesdynamicThreadPoolProperties){
dynamicThreadPoolProperties.getExecutors().forEach(config->{
if(!threadPoolExecutorMap.containsKey(config.getThreadPoolName())){
ThreadPoolMonitorthreadPoolMonitor=newThreadPoolMonitor(
config.getCorePoolSize(),
config.getMaxPoolSize(),
config.getKeepAliveTime(),
config.getUnit(),
newResizableCapacityLinkedBlockingQueue<>(config.getQueueCapacity()),
RejectedExecutionHandlerEnum.getRejectedExecutionHandler(config.getRejectedExecutionType()),
config.getThreadPoolName()
);
threadPoolExecutorMap.put(config.getThreadPoolName(),
threadPoolMonitor);
}

});
}

/**
*调整线程池
*
*@paramdynamicThreadPoolProperties
*/
privatevoidchangeThreadPools(DynamicThreadPoolPropertiesdynamicThreadPoolProperties){
dynamicThreadPoolProperties.getExecutors().forEach(config->{
ThreadPoolExecutorthreadPoolExecutor=threadPoolExecutorMap.get(config.getThreadPoolName());
if(Objects.nonNull(threadPoolExecutor)){
threadPoolExecutor.setCorePoolSize(config.getCorePoolSize());
threadPoolExecutor.setMaximumPoolSize(config.getMaxPoolSize());
threadPoolExecutor.setKeepAliveTime(config.getKeepAliveTime(),config.getUnit());
threadPoolExecutor.setRejectedExecutionHandler(RejectedExecutionHandlerEnum.getRejectedExecutionHandler(config.getRejectedExecutionType()));
BlockingQueue<Runnable>queue=threadPoolExecutor.getQueue();
if(queueinstanceofResizableCapacityLinkedBlockingQueue){
((ResizableCapacityLinkedBlockingQueue<Runnable>)queue).setCapacity(config.getQueueCapacity());
}
}
});
}


@EventListener
publicvoidenvListener(EnvironmentChangeEventevent){
log.info("配置发生变更"+event);
changeThreadPools(dynamicThreadPoolProperties);
}

}

  1. DynamicThreadPoolPropertiesController对外暴露两个方法,第一个通过ContextRefresher提供对外刷新配置的接口,实现及时更新配置信息,第二提供一个查询接口的方法,

/**
*动态修改线程池参数
*
*@authorwangtongzhou
*@since2022-03-1317:27
*/
@RestController
publicclassDynamicThreadPoolPropertiesController{

@Autowired
privateContextRefreshercontextRefresher;


@Autowired
privateDynamicThreadPoolPropertiesdynamicThreadPoolProperties;


@Autowired
privateDynamicThreadPoolManagerdynamicThreadPoolManager;


@PostMapping("/threadPool/properties")
publicvoidupdate(){
ThreadPoolPropertiesthreadPoolProperties=
dynamicThreadPoolProperties.getExecutors().get(0);
threadPoolProperties.setCorePoolSize(20);
threadPoolProperties.setMaxPoolSize(50);
threadPoolProperties.setQueueCapacity(200);
threadPoolProperties.setRejectedExecutionType("CallerRunsPolicy");
contextRefresher.refresh();
}

@GetMapping("/threadPool/properties")
publicMap<String,Object>queryThreadPoolProperties(){
Map<String,Object>metricMap=newHashMap<>();
List<Map>threadPools=newArrayList<>();
dynamicThreadPoolManager.getThreadPoolExecutorMap().forEach((k,v)->{
ThreadPoolMonitorthreadPoolMonitor=(ThreadPoolMonitor)v;
Map<String,Object>poolInfo=newHashMap<>();
poolInfo.put("thread.pool.name",k);
poolInfo.put("thread.pool.core.size",threadPoolMonitor.getCorePoolSize());
poolInfo.put("thread.pool.largest.size",threadPoolMonitor.getLargestPoolSize());
poolInfo.put("thread.pool.max.size",threadPoolMonitor.getMaximumPoolSize());
poolInfo.put("thread.pool.thread.count",threadPoolMonitor.getPoolSize());
poolInfo.put("thread.pool.max.costTime",threadPoolMonitor.getMaxCostTime());
poolInfo.put("thread.pool.average.costTime",threadPoolMonitor.getAverageCostTime());
poolInfo.put("thread.pool.min.costTime",threadPoolMonitor.getMinCostTime());
poolInfo.put("thread.pool.active.count",threadPoolMonitor.getActiveCount());
poolInfo.put("thread.pool.completed.taskCount",threadPoolMonitor.getCompletedTaskCount());
poolInfo.put("thread.pool.queue.name",threadPoolMonitor.getQueue().getClass().getName());
poolInfo.put("thread.pool.rejected.name",threadPoolMonitor.getRejectedExecutionHandler().getClass().getName());
poolInfo.put("thread.pool.task.count",threadPoolMonitor.getTaskCount());
threadPools.add(poolInfo);
});
metricMap.put("threadPools",threadPools);
returnmetricMap;
}

}

整体上的流程到这里就完成了,算是一个Demo版,对于该组件更深入的思考我认为还可以做以下三件事情:

  1. 应该以starter的形式嵌入到应用,通过判断启动类加载的Appllo、Nacos还是默认实现;
  2. 对外可以Push、也可以是日志,还可以支持各种库,提供丰富的输出形式,这个样子的话更加通用化;
  3. 提供统一查询接口、修改接口、增加权限校验、增加预警规则配置;

参考以下内容:

美团文章

结束

欢迎大家点点关注,点点赞!