如何快速学会使用Java ElasticJob实现分布式定时任务?

2026-05-15 22:221阅读0评论SEO资源
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计3466个文字,预计阅读时间需要14分钟。

如何快速学会使用Java ElasticJob实现分布式定时任务?

目录+前言+架构+功能和特性+入门角度+写一个例子+任务执行流程+ScheduleJobBootstrap初始化+ScheduleJobBootstrap执行+执行流程总结+分片的策略+前言+ElasticJob+是面向互联网和海量任务的分布式调度解决方案

目录
  • 前言
  • 架构
  • 功能和特性
  • 入门角色
  • 写个例子
  • 任务执行流程
    • ScheduleJobBootstrap初始化
    • ScheduleJobBootstrap执行
    • 执行流程总结
  • 分片的策略

    前言

    ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案。 它通过弹性调度、资源管控、以及任务治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的任务生态。 它的各个产品使用统一的任务 API,开发者仅需一次开发,即可随意部署。

    架构

    elasticjob由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成组成,这是ElasticJob-Lite 的架构图:

    从架构图可以看到,左上角App1和App2两个业务模块中的Elastic-Job往zk中注册了信息,右边的Elastic-Job-Lite是监听了zk的,因此,整个任务的调度是由zk来完成的。下面的console通过Rest API去获取zk中的信息,得到调度数据和日志,并存盘。

    这是ElasticJob-Cloud的架构图:

    ElasticJob-Cloud的调度是依赖Mesos的,从架构图的理解,Mesos和zk结合做好任务调度,再分发给Mesos的代理并执行。

    功能和特性

    以下是ElasticJob的特性优点

    • 支持任务在分布式场景下的分片和高可用
    • 能够水平扩展任务的吞吐量和执行效率
    • 任务处理能力随资源配备弹性伸缩
    • 优化任务和资源调度
    • 相同任务聚合至相同的执行器统一处理
    • 动态调配追加资源至新分配的任务
    • 失效转移
    • 错过任务重新执行
    • 分布式环境下任务自动诊断和修复
    • 基于有向无环图 (DAG) 的任务依赖
    • 基于有向无环图 (DAG) 的任务项目依赖
    • 可扩展的任务类型统一接口
    • 支持丰富的任务类型库--包括数据流、脚本、HTTP、文件、大数据
    • 易于对接业务任务--兼容 Spring IOC
    • 任务管控端
    • 任务事件追踪
    • 注册中心管理

    入门角色

    既然这么多优点,我们就入门试试吧。入门elasticjob-lite也继承了Quartz框架,同样的很简单,只要三个角色:

    SimpleJob:任务主体。如果用过Quartz,那么应该能够理解这个,基本上和Quartz的Job接口类似,只要实现一个execute方法就行了,入门用这个就行;

    JobConfiguration:任务配置。同样的可以理解为类似Quartz框架中的Trigger,最重要的就是配置任务的执行频率;

    ScheduleJobBootstrap:调度主体。这个一样,参考Quartz框架中的Scheduler对象,它把任务和配置结合起来,任务按照配置中的频率执行。

    写个例子

    我们创建这三种角色,首先创建任务主体:

    import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; /** * (这个类的说明) * * @author mars酱 */ public class MarsSimpleJob implements SimpleJob { @Override public void execute(final ShardingContext shardingContext) { System.out.printf("Item: %s | Time: %s | Thread: %s | %s%n", shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "就是这么简单~"); } }

    再创建任务配置:

    import org.apache.shardingsphere.elasticjob.api.JobConfiguration; import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration; import javax.sql.DataSource; import java.util.Objects; /** * (这个类的说明) * * @author mars酱 */ public class JobConfigurationBuilder { public static JobConfiguration buildJobConfiguration(String jobName, String cronExpression, TracingConfiguration<DataSource> tracingConfig) { JobConfiguration.Builder builder = JobConfiguration.newBuilder(jobName, 3) .cron(cronExpression) .shardingItemParameters("0=a,1=b,2=c"); if (Objects.nonNull(tracingConfig)) { builder.addExtraConfigurations(tracingConfig); } return builder.build(); } }

    最后创建调度器,并执行:

    import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap; import org.apache.shardingsphere.elasticjob.lite.example.job.simple.JavaSimpleJob; import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter; import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration; import javax.sql.DataSource; /** * (这个类的说明) * * @author mars酱 */ public final class SchedulerMain { private static final int EMBED_ZOOKEEPER_PORT = 4181; private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:" + EMBED_ZOOKEEPER_PORT; private static final String JOB_NAMESPACE = "elasticjob-marsz-lite-java"; // CHECKSTYLE:OFF public static void main(final String[] args) { // 内嵌zk服务 EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT); CoordinatorRegistryCenter regCenter = setUpRegistryCenter(); // 简单作业 setUpSimpleJob(regCenter, null); } private static CoordinatorRegistryCenter setUpRegistryCenter() { ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE); CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig); result.init(); return result; } private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final TracingConfiguration<DataSource> tracingConfig) { new ScheduleJobBootstrap(regCenter, new MarsSimpleJob(), JobConfigurationBuilder.buildJobConfiguration("marsSimpleJob", "0/5 * * * * ?", tracingConfig)).schedule(); } }

    运行的效果:

    截图中Item是处理的分片项,Thread是当前线程的id,看到了Quartz框架的影子...。

    任务执行流程

    既然能成功运行,我们看看内部的处理逻辑吧。Mars酱本机并没有安装zk,所以copy了官方的例子,在程序运行前先启用了一个内嵌的zk服务:

    EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);

    这个只能在模拟的时候使用,千万不能拿去放生产环境。接下来就是注册中心的配置了,我们需要的是CoordinatorRegistryCenter对象:

    private static CoordinatorRegistryCenter setUpRegistryCenter() { ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE); CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig); result.init(); return result; }

    好了,zk的部分处理完成,下面就是直接SchedulerJobBootstrap的部分了。

    ScheduleJobBootstrap初始化

    ScheduleJobBootstrap的初始化在例子中需要三个参数:

    CoordinatorRegistryCenter:这个是协调用的注册中心。是一个接口类,它的实现在ElasticJob里面只有一个ZookeeperRegisterCenter对象,未来是不是会支持其他的注册中心呢?

    ElasticJob: Mars酱理解为任务对象。但是ElasticJob这个对象本身是个空接口,有两个子接口SimpleJobDataflowJob,前者Mars酱的理解是和Quartz中的Job对象类似,只要实现execute函数就行,后者有需要实现两个接口,一个fetchData获取数据,一个processData处理数据。所以,ElasticJob这个接口留空,是为了还有其他扩展吧?

    JobConfiguration:弹性任务配置项。构建这个对象不能直接设置,只能用buider的方式构建。需要配置的属性很多,但是核心属性大致就是几个:任务名称、分片数、执行频率、分片参数。JobConfiguration的所有属性如下:

    属性名说明String jobName任务名称String croncron表达式String timeZone任务运行的时区int shardingTotalCount任务分片总数String shardingItemParameters分片序号和参数,多个键值对之间用逗号分隔,从0开始,但是不能大于或等于任务分片的总数String jobParameter任务自定义任务参数boolean monitorExecution是否监听执行boolean failover是否启用故障转移。开启表示如果任务在一次任务执行中途宕机,允许将该次未完成的任务在另一任务节点上补偿执行boolean misfire不发火。哈哈,其实是是否开启错过任务重新执行int maxTimeDiffSeconds最大时差int reconcileIntervalMinutes间隔时长String jobShardingStrategyType任务分片策略类型,总共三种String jobExecutorServiceHandlerType任务执行程序服务处理程序类型String jobErrorHandlerType任务错误处理类型Collection jobListenerTypes任务监听类型Collection extraConfigurations附加配置信息String description任务描述Properties props扩展用属性值boolean disabled是否禁用boolean overwrite是否覆盖String label标签boolean staticSharding是否支持静态分片

    如何快速学会使用Java ElasticJob实现分布式定时任务?

    ScheduleJobBootstrap执行

    同样的,例子中的MarsSimpleJob的execute函数,最终会被ElasticJob框架调用,我们按照被执行的反向顺序往上找。MarsSimpleJob是继承SimpleJob的, 而SimpleJob的execute函数是被SimpleJobExecutor所调用:

    /** * Simple job executor. */ public final class SimpleJobExecutor implements ClassedJobItemExecutor<SimpleJob> { @Override public void process(final SimpleJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) { // 这里调用execute函数 elasticJob.execute(shardingContext); } @Override public Class<SimpleJob> getElasticJobClass() { return SimpleJob.class; } }

    再继续往上找,process的核心流程就是在ElasticJobExecutor里面了,调用process的部分在ElasticJobExcutor中几个重载的process方法调用的,两个process函数完成不同的功能,调用SimpleExecutor的process部分是这样:

    @SuppressWarnings("unchecked") private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) { jobFacade.postJobExecutionEvent(startEvent); log.trace("Job '{}' executing, item is: '{}'.", jobConfig.getJobName(), item); JobExecutionEvent completeEvent; try { // 这里调用SimpleJobExecutor的process jobItemExecutor.process(elasticJob, jobConfig, jobFacade, shardingContexts.createShardingContext(item)); completeEvent = startEvent.executionSuccess(); log.trace("Job '{}' executed, item is: '{}'.", jobConfig.getJobName(), item); jobFacade.postJobExecutionEvent(completeEvent); // CHECKSTYLE:OFF } catch (final Throwable cause) { // CHECKSTYLE:ON completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause)); jobFacade.postJobExecutionEvent(completeEvent); itemErrorMessages.put(item, ExceptionUtils.transform(cause)); JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class); jobErrorHandler.handleException(jobConfig.getJobName(), cause); } }

    上面这个process负责最终任务的执行部分,由JobItemExecutor对象调用,SimpleJobExecutor被JobItemExecutor接口定义。整个这个proces由guava包的EventBus处理消息事件,执行之前有startEvent,执行完成有completeEvent,异常也有对应的失败event,方面架构图中存盘事件日志、ELK日志收集动作。

    调用这个process的部分,由另一个process完成,长这样的:

    private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) { Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet(); if (1 == items.size()) { int item = shardingContexts.getShardingItemParameters().keySet().iterator().next(); JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item); process(jobConfig, shardingContexts, item, jobExecutionEvent); return; } CountDownLatch latch = new CountDownLatch(items.size()); for (int each : items) { JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each); ExecutorService executorService = executorContext.get(ExecutorService.class); if (executorService.isShutdown()) { return; } // 提交给线程池执行 executorService.submit(() -> { try { process(jobConfig, shardingContexts, each, jobExecutionEvent); } finally { latch.countDown(); } }); } try { latch.await(); } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); } }

    上面这个process负责把分片参数依次组装好,设置好JobExecutionEvent中的ip、主机名等参数,然后放入线程池中去执行。再往上,看现在这个process被调用的部分:

    private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) { if (shardingContexts.getShardingItemParameters().isEmpty()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName())); return; } // 往注册中心注册ShardingContexts信息 jobFacade.registerJobBegin(shardingContexts); String taskId = shardingContexts.getTaskId(); // 发送跟踪日志,标记任务正在运行 jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, ""); try { // 调用process process(jobConfig, shardingContexts, executionSource); } finally { // TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure // 告知注册中心任务完成 jobFacade.registerJobCompleted(shardingContexts); if (itemErrorMessages.isEmpty()) { // 没有失败信息,通知任务完成 jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, ""); } else { // 否则通知失败 jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString()); itemErrorMessages.clear(); } } }

    方法execute从注册中心注册ShardingContext信息,并发送跟踪日志事件,然后调用process,最后发送跟踪消息标记任务完成。再有一个重载的execute方法调用上面这个execute方法,如下:

    public void execute() { // job的配置信息 JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true); executorContext.reloadIfNecessary(jobConfig); JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class); try { jobFacade.checkJobExecutionEnvironment(); } catch (final JobExecutionEnvironmentException cause) { jobErrorHandler.handleException(jobConfig.getJobName(), cause); } // 这里有玄机 ShardingContexts shardingContexts = jobFacade.getShardingContexts(); // 发送时间消息总线 jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName())); if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobConfig.getJobName(), shardingContexts.getShardingItemParameters().keySet())); return; } try { // 任务执行的前置流程 jobFacade.beforeJobExecuted(shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobErrorHandler.handleException(jobConfig.getJobName(), cause); } // 调用上面的execute方法 execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER); while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE); } // 故障转移 jobFacade.failoverIfNecessary(); try { // 任务执行的后置流程 jobFacade.afterJobExecuted(shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobErrorHandler.handleException(jobConfig.getJobName(), cause); } }

    这个execute就由Quartz的JobRunShell调用了,Quartz的调用的过程在 Java | 一分钟掌握定时任务 | 6 - Quartz定时任务里面还好Mars酱分析过了。

    执行流程总结

    那么,追踪完源代码,大致的流程就应该是如下:

    1.组装基本参数(任务、频率等) -> 2. ScheduleJobBootstrap初始化 -> 3.配置任务属性 -> 4.设置各种facade -> 5.初始化ElasticJobExecutor -> 6.调用scheduler执行任务 -> 7.获取任务执行器(SimpleJobExecutor) -> 8.各种校验逻辑 -> 9. 处理分片参数 -> 10. 设置任务为运行状态 -> 11. 提交任务到线程池 -> 12.执行任务 -> 13.处理任务后续逻辑

    任务的调度过程由zk完成,取决于zk的任务调度策略吧?如果一台机器的定时运行时挂了,zk会转移到另一台运行中的机器中去。-- Mars酱

    分片的策略

    任务的分片策略,用于将任务在分布式环境下分解成为任务使用。

    SPI 名称详细说明JobShardingStrategy作业分片策略接口 已知实现类详细说明AverageAllocationJobShardingStrategy根据分片项平均分片OdevitySortByNameJobShardingStrategy根据任务名称哈希值的奇偶数决定按照任务服务器 IP 升序或是降序的方式分片RoundRobinByNameJobShardingStrategy根据任务名称轮询分片

    那么任务的分片策略在哪里使用的呢?就在代码中注释的“这里有玄机”那行。在getShardingContexts的方法中会调用ShardingService,它会去获取JobConfiguration中配置的分片策略方式:

    public void shardingIfNecessary() { List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances(); if (!isNeedSharding() || availableJobInstances.isEmpty()) { return; } if (!leaderService.isLeaderUntilBlock()) { blockUntilShardingCompleted(); return; } waitingOtherShardingItemCompleted(); JobConfiguration jobConfig = configService.load(false); int shardingTotalCount = jobConfig.getShardingTotalCount(); log.debug("Job '{}' sharding begin.", jobName); jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, ""); resetShardingInfo(shardingTotalCount); // 获取任务分片策略 JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType()); jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount))); log.debug("Job '{}' sharding complete.", jobName); }

    如果不设置,默认使用的是平均分片策略。

    以上就是一分钟掌握Java ElasticJob分布式定时任务的详细内容,更多关于Java ElasticJob定时任务的资料请关注自由互联其它相关文章!

    本文共计3466个文字,预计阅读时间需要14分钟。

    如何快速学会使用Java ElasticJob实现分布式定时任务?

    目录+前言+架构+功能和特性+入门角度+写一个例子+任务执行流程+ScheduleJobBootstrap初始化+ScheduleJobBootstrap执行+执行流程总结+分片的策略+前言+ElasticJob+是面向互联网和海量任务的分布式调度解决方案

    目录
    • 前言
    • 架构
    • 功能和特性
    • 入门角色
    • 写个例子
    • 任务执行流程
      • ScheduleJobBootstrap初始化
      • ScheduleJobBootstrap执行
      • 执行流程总结
    • 分片的策略

      前言

      ElasticJob 是面向互联网生态和海量任务的分布式调度解决方案。 它通过弹性调度、资源管控、以及任务治理的功能,打造一个适用于互联网场景的分布式调度解决方案,并通过开放的架构设计,提供多元化的任务生态。 它的各个产品使用统一的任务 API,开发者仅需一次开发,即可随意部署。

      架构

      elasticjob由两个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成组成,这是ElasticJob-Lite 的架构图:

      从架构图可以看到,左上角App1和App2两个业务模块中的Elastic-Job往zk中注册了信息,右边的Elastic-Job-Lite是监听了zk的,因此,整个任务的调度是由zk来完成的。下面的console通过Rest API去获取zk中的信息,得到调度数据和日志,并存盘。

      这是ElasticJob-Cloud的架构图:

      ElasticJob-Cloud的调度是依赖Mesos的,从架构图的理解,Mesos和zk结合做好任务调度,再分发给Mesos的代理并执行。

      功能和特性

      以下是ElasticJob的特性优点

      • 支持任务在分布式场景下的分片和高可用
      • 能够水平扩展任务的吞吐量和执行效率
      • 任务处理能力随资源配备弹性伸缩
      • 优化任务和资源调度
      • 相同任务聚合至相同的执行器统一处理
      • 动态调配追加资源至新分配的任务
      • 失效转移
      • 错过任务重新执行
      • 分布式环境下任务自动诊断和修复
      • 基于有向无环图 (DAG) 的任务依赖
      • 基于有向无环图 (DAG) 的任务项目依赖
      • 可扩展的任务类型统一接口
      • 支持丰富的任务类型库--包括数据流、脚本、HTTP、文件、大数据
      • 易于对接业务任务--兼容 Spring IOC
      • 任务管控端
      • 任务事件追踪
      • 注册中心管理

      入门角色

      既然这么多优点,我们就入门试试吧。入门elasticjob-lite也继承了Quartz框架,同样的很简单,只要三个角色:

      SimpleJob:任务主体。如果用过Quartz,那么应该能够理解这个,基本上和Quartz的Job接口类似,只要实现一个execute方法就行了,入门用这个就行;

      JobConfiguration:任务配置。同样的可以理解为类似Quartz框架中的Trigger,最重要的就是配置任务的执行频率;

      ScheduleJobBootstrap:调度主体。这个一样,参考Quartz框架中的Scheduler对象,它把任务和配置结合起来,任务按照配置中的频率执行。

      写个例子

      我们创建这三种角色,首先创建任务主体:

      import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; /** * (这个类的说明) * * @author mars酱 */ public class MarsSimpleJob implements SimpleJob { @Override public void execute(final ShardingContext shardingContext) { System.out.printf("Item: %s | Time: %s | Thread: %s | %s%n", shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "就是这么简单~"); } }

      再创建任务配置:

      import org.apache.shardingsphere.elasticjob.api.JobConfiguration; import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration; import javax.sql.DataSource; import java.util.Objects; /** * (这个类的说明) * * @author mars酱 */ public class JobConfigurationBuilder { public static JobConfiguration buildJobConfiguration(String jobName, String cronExpression, TracingConfiguration<DataSource> tracingConfig) { JobConfiguration.Builder builder = JobConfiguration.newBuilder(jobName, 3) .cron(cronExpression) .shardingItemParameters("0=a,1=b,2=c"); if (Objects.nonNull(tracingConfig)) { builder.addExtraConfigurations(tracingConfig); } return builder.build(); } }

      最后创建调度器,并执行:

      import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap; import org.apache.shardingsphere.elasticjob.lite.example.job.simple.JavaSimpleJob; import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter; import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration; import javax.sql.DataSource; /** * (这个类的说明) * * @author mars酱 */ public final class SchedulerMain { private static final int EMBED_ZOOKEEPER_PORT = 4181; private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:" + EMBED_ZOOKEEPER_PORT; private static final String JOB_NAMESPACE = "elasticjob-marsz-lite-java"; // CHECKSTYLE:OFF public static void main(final String[] args) { // 内嵌zk服务 EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT); CoordinatorRegistryCenter regCenter = setUpRegistryCenter(); // 简单作业 setUpSimpleJob(regCenter, null); } private static CoordinatorRegistryCenter setUpRegistryCenter() { ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE); CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig); result.init(); return result; } private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final TracingConfiguration<DataSource> tracingConfig) { new ScheduleJobBootstrap(regCenter, new MarsSimpleJob(), JobConfigurationBuilder.buildJobConfiguration("marsSimpleJob", "0/5 * * * * ?", tracingConfig)).schedule(); } }

      运行的效果:

      截图中Item是处理的分片项,Thread是当前线程的id,看到了Quartz框架的影子...。

      任务执行流程

      既然能成功运行,我们看看内部的处理逻辑吧。Mars酱本机并没有安装zk,所以copy了官方的例子,在程序运行前先启用了一个内嵌的zk服务:

      EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);

      这个只能在模拟的时候使用,千万不能拿去放生产环境。接下来就是注册中心的配置了,我们需要的是CoordinatorRegistryCenter对象:

      private static CoordinatorRegistryCenter setUpRegistryCenter() { ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE); CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig); result.init(); return result; }

      好了,zk的部分处理完成,下面就是直接SchedulerJobBootstrap的部分了。

      ScheduleJobBootstrap初始化

      ScheduleJobBootstrap的初始化在例子中需要三个参数:

      CoordinatorRegistryCenter:这个是协调用的注册中心。是一个接口类,它的实现在ElasticJob里面只有一个ZookeeperRegisterCenter对象,未来是不是会支持其他的注册中心呢?

      ElasticJob: Mars酱理解为任务对象。但是ElasticJob这个对象本身是个空接口,有两个子接口SimpleJobDataflowJob,前者Mars酱的理解是和Quartz中的Job对象类似,只要实现execute函数就行,后者有需要实现两个接口,一个fetchData获取数据,一个processData处理数据。所以,ElasticJob这个接口留空,是为了还有其他扩展吧?

      JobConfiguration:弹性任务配置项。构建这个对象不能直接设置,只能用buider的方式构建。需要配置的属性很多,但是核心属性大致就是几个:任务名称、分片数、执行频率、分片参数。JobConfiguration的所有属性如下:

      属性名说明String jobName任务名称String croncron表达式String timeZone任务运行的时区int shardingTotalCount任务分片总数String shardingItemParameters分片序号和参数,多个键值对之间用逗号分隔,从0开始,但是不能大于或等于任务分片的总数String jobParameter任务自定义任务参数boolean monitorExecution是否监听执行boolean failover是否启用故障转移。开启表示如果任务在一次任务执行中途宕机,允许将该次未完成的任务在另一任务节点上补偿执行boolean misfire不发火。哈哈,其实是是否开启错过任务重新执行int maxTimeDiffSeconds最大时差int reconcileIntervalMinutes间隔时长String jobShardingStrategyType任务分片策略类型,总共三种String jobExecutorServiceHandlerType任务执行程序服务处理程序类型String jobErrorHandlerType任务错误处理类型Collection jobListenerTypes任务监听类型Collection extraConfigurations附加配置信息String description任务描述Properties props扩展用属性值boolean disabled是否禁用boolean overwrite是否覆盖String label标签boolean staticSharding是否支持静态分片

      如何快速学会使用Java ElasticJob实现分布式定时任务?

      ScheduleJobBootstrap执行

      同样的,例子中的MarsSimpleJob的execute函数,最终会被ElasticJob框架调用,我们按照被执行的反向顺序往上找。MarsSimpleJob是继承SimpleJob的, 而SimpleJob的execute函数是被SimpleJobExecutor所调用:

      /** * Simple job executor. */ public final class SimpleJobExecutor implements ClassedJobItemExecutor<SimpleJob> { @Override public void process(final SimpleJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) { // 这里调用execute函数 elasticJob.execute(shardingContext); } @Override public Class<SimpleJob> getElasticJobClass() { return SimpleJob.class; } }

      再继续往上找,process的核心流程就是在ElasticJobExecutor里面了,调用process的部分在ElasticJobExcutor中几个重载的process方法调用的,两个process函数完成不同的功能,调用SimpleExecutor的process部分是这样:

      @SuppressWarnings("unchecked") private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) { jobFacade.postJobExecutionEvent(startEvent); log.trace("Job '{}' executing, item is: '{}'.", jobConfig.getJobName(), item); JobExecutionEvent completeEvent; try { // 这里调用SimpleJobExecutor的process jobItemExecutor.process(elasticJob, jobConfig, jobFacade, shardingContexts.createShardingContext(item)); completeEvent = startEvent.executionSuccess(); log.trace("Job '{}' executed, item is: '{}'.", jobConfig.getJobName(), item); jobFacade.postJobExecutionEvent(completeEvent); // CHECKSTYLE:OFF } catch (final Throwable cause) { // CHECKSTYLE:ON completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause)); jobFacade.postJobExecutionEvent(completeEvent); itemErrorMessages.put(item, ExceptionUtils.transform(cause)); JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class); jobErrorHandler.handleException(jobConfig.getJobName(), cause); } }

      上面这个process负责最终任务的执行部分,由JobItemExecutor对象调用,SimpleJobExecutor被JobItemExecutor接口定义。整个这个proces由guava包的EventBus处理消息事件,执行之前有startEvent,执行完成有completeEvent,异常也有对应的失败event,方面架构图中存盘事件日志、ELK日志收集动作。

      调用这个process的部分,由另一个process完成,长这样的:

      private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) { Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet(); if (1 == items.size()) { int item = shardingContexts.getShardingItemParameters().keySet().iterator().next(); JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item); process(jobConfig, shardingContexts, item, jobExecutionEvent); return; } CountDownLatch latch = new CountDownLatch(items.size()); for (int each : items) { JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each); ExecutorService executorService = executorContext.get(ExecutorService.class); if (executorService.isShutdown()) { return; } // 提交给线程池执行 executorService.submit(() -> { try { process(jobConfig, shardingContexts, each, jobExecutionEvent); } finally { latch.countDown(); } }); } try { latch.await(); } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); } }

      上面这个process负责把分片参数依次组装好,设置好JobExecutionEvent中的ip、主机名等参数,然后放入线程池中去执行。再往上,看现在这个process被调用的部分:

      private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) { if (shardingContexts.getShardingItemParameters().isEmpty()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName())); return; } // 往注册中心注册ShardingContexts信息 jobFacade.registerJobBegin(shardingContexts); String taskId = shardingContexts.getTaskId(); // 发送跟踪日志,标记任务正在运行 jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, ""); try { // 调用process process(jobConfig, shardingContexts, executionSource); } finally { // TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure // 告知注册中心任务完成 jobFacade.registerJobCompleted(shardingContexts); if (itemErrorMessages.isEmpty()) { // 没有失败信息,通知任务完成 jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, ""); } else { // 否则通知失败 jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString()); itemErrorMessages.clear(); } } }

      方法execute从注册中心注册ShardingContext信息,并发送跟踪日志事件,然后调用process,最后发送跟踪消息标记任务完成。再有一个重载的execute方法调用上面这个execute方法,如下:

      public void execute() { // job的配置信息 JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true); executorContext.reloadIfNecessary(jobConfig); JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class); try { jobFacade.checkJobExecutionEnvironment(); } catch (final JobExecutionEnvironmentException cause) { jobErrorHandler.handleException(jobConfig.getJobName(), cause); } // 这里有玄机 ShardingContexts shardingContexts = jobFacade.getShardingContexts(); // 发送时间消息总线 jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName())); if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobConfig.getJobName(), shardingContexts.getShardingItemParameters().keySet())); return; } try { // 任务执行的前置流程 jobFacade.beforeJobExecuted(shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobErrorHandler.handleException(jobConfig.getJobName(), cause); } // 调用上面的execute方法 execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER); while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE); } // 故障转移 jobFacade.failoverIfNecessary(); try { // 任务执行的后置流程 jobFacade.afterJobExecuted(shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobErrorHandler.handleException(jobConfig.getJobName(), cause); } }

      这个execute就由Quartz的JobRunShell调用了,Quartz的调用的过程在 Java | 一分钟掌握定时任务 | 6 - Quartz定时任务里面还好Mars酱分析过了。

      执行流程总结

      那么,追踪完源代码,大致的流程就应该是如下:

      1.组装基本参数(任务、频率等) -> 2. ScheduleJobBootstrap初始化 -> 3.配置任务属性 -> 4.设置各种facade -> 5.初始化ElasticJobExecutor -> 6.调用scheduler执行任务 -> 7.获取任务执行器(SimpleJobExecutor) -> 8.各种校验逻辑 -> 9. 处理分片参数 -> 10. 设置任务为运行状态 -> 11. 提交任务到线程池 -> 12.执行任务 -> 13.处理任务后续逻辑

      任务的调度过程由zk完成,取决于zk的任务调度策略吧?如果一台机器的定时运行时挂了,zk会转移到另一台运行中的机器中去。-- Mars酱

      分片的策略

      任务的分片策略,用于将任务在分布式环境下分解成为任务使用。

      SPI 名称详细说明JobShardingStrategy作业分片策略接口 已知实现类详细说明AverageAllocationJobShardingStrategy根据分片项平均分片OdevitySortByNameJobShardingStrategy根据任务名称哈希值的奇偶数决定按照任务服务器 IP 升序或是降序的方式分片RoundRobinByNameJobShardingStrategy根据任务名称轮询分片

      那么任务的分片策略在哪里使用的呢?就在代码中注释的“这里有玄机”那行。在getShardingContexts的方法中会调用ShardingService,它会去获取JobConfiguration中配置的分片策略方式:

      public void shardingIfNecessary() { List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances(); if (!isNeedSharding() || availableJobInstances.isEmpty()) { return; } if (!leaderService.isLeaderUntilBlock()) { blockUntilShardingCompleted(); return; } waitingOtherShardingItemCompleted(); JobConfiguration jobConfig = configService.load(false); int shardingTotalCount = jobConfig.getShardingTotalCount(); log.debug("Job '{}' sharding begin.", jobName); jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, ""); resetShardingInfo(shardingTotalCount); // 获取任务分片策略 JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType()); jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount))); log.debug("Job '{}' sharding complete.", jobName); }

      如果不设置,默认使用的是平均分片策略。

      以上就是一分钟掌握Java ElasticJob分布式定时任务的详细内容,更多关于Java ElasticJob定时任务的资料请关注自由互联其它相关文章!