Java中如何实现多线程定时任务解析?

2026-05-22 06:491阅读0评论SEO资讯
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1702个文字,预计阅读时间需要7分钟。

Java中如何实现多线程定时任务解析?

之前写Java定时任务,用到了ScheduledExecutorService,自己没有思路来实现定时任务的功能,所以十分别好奇其底层代码的实现。于是就去看了源码,在这个过程中还发现了Doug L的存在。

之前写 Java 定时任务 用到了 ScheduledExecutorService,自己没有思路来实现定时任务的功能,所以十分好奇其底层代码的实现,于是乎就去翻看源码,在这过程中还发现了无处不在的 Doug Lea



1. ScheduledExecutorService

ScheduledExecutorService 是 jdk 提供的计划执行服务接口(实现类是个线程池)。这里举例 newScheduledThreadPool 来分析,其是指定核心线程数的计划线程池


1.1 基本使用

public class ExecutorSchedule { public static void main(String[] args) { // 需要计划执行的任务 Runnable runnable = () -> { System.out.println("执行定时任务"); }; // 计划的时间参数 long delay = 0; long period = 1000 * 5; // 计划线程池执行 ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10); service.scheduleAtFixedRate(runnable, delay, period, TimeUnit.MILLISECONDS); }

1.2 构造函数

通过构造函数知道计划执行的关键是参数中的阻塞队列:new DelayedWorkQueue()

// 创建一个计划线程池执行器 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } // super 最终指向 ThreadPoolExecutor // 计划线程池调用 ThreadPoolExecutor 方法来构造线程池(阿里规范要手动传参建立线程池) public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } // 默认的构造线程池方法 public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // xxx }







2. DelayedWorkQueue

DelayedWorkQueue 顾名思义是一个延迟队列,用来存放线程池待执行的任务


2.1 继承关系

在 DelayedWorkQueue 的类声明中知道,它是一个阻塞队列(线程池并发时保持数据一致性,这部分内容以后会介绍)

// AbstractQueue:有队列的基本方法 // BlockingQueue:有阻塞队列的基本方法 class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {}

2.2 代码实现

在 DelayedWorkQueue 的具体代码中发现 DelayedWorkQueue 的内部数据结构是堆,里面存放了 RunnableScheduledFuture 对象。然后我们大胆猜测: RunnableScheduledFuture 就是延迟任务的核心,里面存放了延迟时间,DelayedWorkQueue 按照这个延迟时间来进行内部排序

// 堆特有的方法,上浮下潜 private void siftUp(int k, RunnableScheduledFuture<?> key) {} private void siftDown(int k, RunnableScheduledFuture<?> key) {}







3. RunnableScheduledFuture

RunnableScheduledFuture 只是一个接口,ScheduledThreadPoolExecutor 线程池的内部有该接口的实现类(ScheduledFutureTask),我们先来看 RunnableScheduledFuture,然后再分析 ScheduledFutureTask 的接口实现


3.1 接口分析

// RunnableFuture 是异步获取结果的内容 public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {} // Future 是异步获取结果,和 Runnable 对比 public interface ScheduledFuture<V> extends Delayed, Future<V> {} // 这里就是我们要找核心实现了 // 继承了 Comparable,在堆里面可按定延迟时间来排序对比 public interface Delayed extends Comparable<Delayed> { /** * 返回与此对象关联的剩余给定时间单位 * * @param 时间单位 * @return 剩余延迟时间,0或负值表示时间已经过去 */ long getDelay(TimeUnit unit); }

3.2 ScheduledFutureTask

ScheduledFutureTask 的构造函数入参有 Runnable、result、ns:分别是 延迟任务、Future 返回、延迟时间

// FutureTask 是异步任务 class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { private final long sequenceNumber; private long time; private final long period; // 构造方法 ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } // 获取延迟时间(延迟任务的关键) public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } // 入队的对比方法 public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } }

3.3 实现分析

1.1 基本使用:service.scheduleAtFixedRate(runnable, delay, period, TimeUnit.MILLISECONDS) 就将提交的任务与延迟时间封装成 ScheduledFutureTask,

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { // xxx 省略无关代码 // 在执行延时任务的时候,传入 Runnable,延迟参数创建 ScheduledFutureTask 对象 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); // 装饰器 RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 延迟执行 delayedExecute(t); return t; } // 延迟执行具体实现 private void delayedExecute(RunnableScheduledFuture<?> task) { // 判断线程池是否关闭 if (isShutdown()) // 拒绝队列执行 reject(task); else { // 往 workQueue(即 DelayedWorkQueue)里添加 上一步被封装的 ScheduledFutureTask // 至此,该延时任务已经提交到线程池等待执行了 super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }







4. 总结过程

  • 我们需要延迟执行的任务被封装成 ScheduledFutureTask
  • 然后被添加到 DelayedWorkQueue 中,队列头部是达到延迟时间的任务(内部堆是按延迟时间排序)
  • 线程池循环执行任务时从 DelayedWorkQueue 中获取,即可实现延迟任务的功能








5. 拓展

参考 ScheduledExecutorService 的设计,我们也能写一个简易的延迟队列出来

Java中如何实现多线程定时任务解析?


5.1 Delayed 对象

public class DelayedTask implements Delayed, Runnable { // 任务名 private String name; // 开始时间 private long start = System.currentTimeMillis(); // 要延迟的时间 private long delayTime; DelayedTask(String name, long delayTime) { this.name = name; this.delayTime = delayTime; } // 返回剩余时间 @Override public long getDelay(TimeUnit unit) { return unit.convert((start + delayTime) - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } // 排序方法 @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public void run() { System.out.println(this.name + " " + "延迟任务执行了"); } }

5.2 延迟线程池

public class DelayedThread extends Thread { private BlockingQueue<DelayedTask> queue; public DelayedThread(BlockingQueue<DelayedTask> queue) { this.queue = queue; } @Override public void run() { DelayedTask target = null; while (!queue.isEmpty()) { try { target = queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } target.run(); } } }

5.3 执行事例

public class DelayedTest { public static void main(String[] args) { DelayedTask task1 = new DelayedTask("1", 1000); DelayedTask task2 = new DelayedTask("2", 5000); DelayQueue<DelayedTask> delayQueue = new DelayQueue(); delayQueue.add(task1); delayQueue.add(task2); DelayedThread delayedThread = new DelayedThread(delayQueue); delayedThread.start(); } } // 1 延迟任务执行了 // 2 延迟任务执行了


参考

blog.csdn.net/dkfajsldfsdfsd/article/details/88966814



本文共计1702个文字,预计阅读时间需要7分钟。

Java中如何实现多线程定时任务解析?

之前写Java定时任务,用到了ScheduledExecutorService,自己没有思路来实现定时任务的功能,所以十分别好奇其底层代码的实现。于是就去看了源码,在这个过程中还发现了Doug L的存在。

之前写 Java 定时任务 用到了 ScheduledExecutorService,自己没有思路来实现定时任务的功能,所以十分好奇其底层代码的实现,于是乎就去翻看源码,在这过程中还发现了无处不在的 Doug Lea



1. ScheduledExecutorService

ScheduledExecutorService 是 jdk 提供的计划执行服务接口(实现类是个线程池)。这里举例 newScheduledThreadPool 来分析,其是指定核心线程数的计划线程池


1.1 基本使用

public class ExecutorSchedule { public static void main(String[] args) { // 需要计划执行的任务 Runnable runnable = () -> { System.out.println("执行定时任务"); }; // 计划的时间参数 long delay = 0; long period = 1000 * 5; // 计划线程池执行 ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10); service.scheduleAtFixedRate(runnable, delay, period, TimeUnit.MILLISECONDS); }

1.2 构造函数

通过构造函数知道计划执行的关键是参数中的阻塞队列:new DelayedWorkQueue()

// 创建一个计划线程池执行器 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } // super 最终指向 ThreadPoolExecutor // 计划线程池调用 ThreadPoolExecutor 方法来构造线程池(阿里规范要手动传参建立线程池) public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } // 默认的构造线程池方法 public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // xxx }







2. DelayedWorkQueue

DelayedWorkQueue 顾名思义是一个延迟队列,用来存放线程池待执行的任务


2.1 继承关系

在 DelayedWorkQueue 的类声明中知道,它是一个阻塞队列(线程池并发时保持数据一致性,这部分内容以后会介绍)

// AbstractQueue:有队列的基本方法 // BlockingQueue:有阻塞队列的基本方法 class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {}

2.2 代码实现

在 DelayedWorkQueue 的具体代码中发现 DelayedWorkQueue 的内部数据结构是堆,里面存放了 RunnableScheduledFuture 对象。然后我们大胆猜测: RunnableScheduledFuture 就是延迟任务的核心,里面存放了延迟时间,DelayedWorkQueue 按照这个延迟时间来进行内部排序

// 堆特有的方法,上浮下潜 private void siftUp(int k, RunnableScheduledFuture<?> key) {} private void siftDown(int k, RunnableScheduledFuture<?> key) {}







3. RunnableScheduledFuture

RunnableScheduledFuture 只是一个接口,ScheduledThreadPoolExecutor 线程池的内部有该接口的实现类(ScheduledFutureTask),我们先来看 RunnableScheduledFuture,然后再分析 ScheduledFutureTask 的接口实现


3.1 接口分析

// RunnableFuture 是异步获取结果的内容 public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {} // Future 是异步获取结果,和 Runnable 对比 public interface ScheduledFuture<V> extends Delayed, Future<V> {} // 这里就是我们要找核心实现了 // 继承了 Comparable,在堆里面可按定延迟时间来排序对比 public interface Delayed extends Comparable<Delayed> { /** * 返回与此对象关联的剩余给定时间单位 * * @param 时间单位 * @return 剩余延迟时间,0或负值表示时间已经过去 */ long getDelay(TimeUnit unit); }

3.2 ScheduledFutureTask

ScheduledFutureTask 的构造函数入参有 Runnable、result、ns:分别是 延迟任务、Future 返回、延迟时间

// FutureTask 是异步任务 class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { private final long sequenceNumber; private long time; private final long period; // 构造方法 ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } // 获取延迟时间(延迟任务的关键) public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } // 入队的对比方法 public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } }

3.3 实现分析

1.1 基本使用:service.scheduleAtFixedRate(runnable, delay, period, TimeUnit.MILLISECONDS) 就将提交的任务与延迟时间封装成 ScheduledFutureTask,

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { // xxx 省略无关代码 // 在执行延时任务的时候,传入 Runnable,延迟参数创建 ScheduledFutureTask 对象 ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); // 装饰器 RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; // 延迟执行 delayedExecute(t); return t; } // 延迟执行具体实现 private void delayedExecute(RunnableScheduledFuture<?> task) { // 判断线程池是否关闭 if (isShutdown()) // 拒绝队列执行 reject(task); else { // 往 workQueue(即 DelayedWorkQueue)里添加 上一步被封装的 ScheduledFutureTask // 至此,该延时任务已经提交到线程池等待执行了 super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }







4. 总结过程

  • 我们需要延迟执行的任务被封装成 ScheduledFutureTask
  • 然后被添加到 DelayedWorkQueue 中,队列头部是达到延迟时间的任务(内部堆是按延迟时间排序)
  • 线程池循环执行任务时从 DelayedWorkQueue 中获取,即可实现延迟任务的功能








5. 拓展

参考 ScheduledExecutorService 的设计,我们也能写一个简易的延迟队列出来

Java中如何实现多线程定时任务解析?


5.1 Delayed 对象

public class DelayedTask implements Delayed, Runnable { // 任务名 private String name; // 开始时间 private long start = System.currentTimeMillis(); // 要延迟的时间 private long delayTime; DelayedTask(String name, long delayTime) { this.name = name; this.delayTime = delayTime; } // 返回剩余时间 @Override public long getDelay(TimeUnit unit) { return unit.convert((start + delayTime) - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } // 排序方法 @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public void run() { System.out.println(this.name + " " + "延迟任务执行了"); } }

5.2 延迟线程池

public class DelayedThread extends Thread { private BlockingQueue<DelayedTask> queue; public DelayedThread(BlockingQueue<DelayedTask> queue) { this.queue = queue; } @Override public void run() { DelayedTask target = null; while (!queue.isEmpty()) { try { target = queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } target.run(); } } }

5.3 执行事例

public class DelayedTest { public static void main(String[] args) { DelayedTask task1 = new DelayedTask("1", 1000); DelayedTask task2 = new DelayedTask("2", 5000); DelayQueue<DelayedTask> delayQueue = new DelayQueue(); delayQueue.add(task1); delayQueue.add(task2); DelayedThread delayedThread = new DelayedThread(delayQueue); delayedThread.start(); } } // 1 延迟任务执行了 // 2 延迟任务执行了


参考

blog.csdn.net/dkfajsldfsdfsd/article/details/88966814