elasticJob实战,各作业类型使用及所产生的问题分析

    xiaoxiao2025-08-26  4

    该篇文章主要关注于SimpleJob、DataflowJob、ScriptJobs三种任务的具体实现及相关问题。 采用Elastic-Job-Lite方案进行。以下例子均在SpringBoot项目中进行试验,并使用bean的配置,替代原有xml配置形式。主要使用Elastic-Job-Lite方案进行试验。 Elastic-Job-Lite方案与Elastic-Job-Cloud方案基本的作业是一样的,使用同一套API开发作业,开发者仅需一次开发,即可根据需要以Lite或Cloud的方式部署;唯一区别在于应用环境有所不同。

    1. 一些概念及特点(看过的可略过)

    概念 Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。 Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。 Elastic-Job-Cloud使用Mesos + Docker的解决方案,额外提供资源治理、应用分发以及进程隔离等服务,采用中心化调度实现难度小于Elastic-Job-Lite的无中心化调度,无需再考虑多线程并发的情况。

    特点 Elastic-Job-Lite 分布式调度协调、弹性扩容缩容、失效转移、错过执行作业重触发、作业分片一致性,保证同一分片在分布式环境中仅一个执行实例、自诊断并修复分布式不稳定造成的问题、支持并行调度、支持作业生命周期操作、丰富的作业类型、Spring整合以及命名空间提供、运维平台 Elastic-Job-Cloud 应用自动分发 、基于Fenzo的弹性资源分配、分布式调度协调、弹性扩容缩容、失效转移、错过执行作业重触发、作业分片一致性,保证同一分片在分布式环境中仅一个执行实例、支持并行调度、支持作业生命周期操作、丰富的作业类型、Spring整合、运维平台 、基于Docker的进程隔离(TBD)

    2. 实战准备

    前提先准备好Zookeeper环境 ,下载地址:https://archive.apache.org/dist/zookeeper/ 选择最新的即可,解压后重命名conf/zoo-example.cfg文件为conf/zoo.cfg,可根据情况修改配置信息,比如端口、数据地址

    # The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/local/zookeeper-3.4.9/data # the port at whic # the maximum numh the clients will connect clientPort=2181 # ber of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1

    使用命令./zkServer.sh start启动即可

    Zookeeper在这的目的之一在于服务发现。如果一个项目有多个实例运行,并需要job进行分布式调度,使用相同的Zookeeper配置即可,Elastic-Job会根据Zookeeper中注册的服务信息发现其它服务,进行任务调度。

    现在过程 引入依赖:

    <!-- https://mvnrepository.com/artifact/com.dangdang/elastic-job-lite-core --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.5</version> </dependency> <!-- https://mvnrepository.com/artifact/com.dangdang/elastic-job-lite-spring --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency>

    官方未提供相关SpringBoot的start包,但可以考虑三方的:https://github.com/yinjihuan/elastic-job-spring-boot-starter 这里不以此为例。

    elastic-job提供三种作业执行器:

    作业作业接口执行器简单作业SimpleJobSimpleJobExecutor数据流作业DataflowJobDataflowJobExecutor脚本作业ScriptJobScriptJobExecutor

    这些任务作用该怎么用,有哪些注意点?以下一一通过示例方式进行讲解。

    3. SimpleJob

    simpleJob和我们平常所写的job基本无太多区别,主要需要实现simpleJob接口

    3.1 配置Zookeeper

    @Bean(initMethod = "init") public ZookeeperRegistryCenter zookeeperRegistryCenter() { ZookeeperConfiguration zkConfig = new ZookeeperConfiguration("127.0.0.1:2181", "elasticJob-simpleTask"); ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zkConfig); return zookeeperRegistryCenter; }

    根据情况调整Zookeeper地址及命名空间,命名空间主要用于区分任务

    3.2 任务创建

    @Component @Slf4j public class SimpleJobTest implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { log.info("-----任务开始执行-----" + LocalTime.now().toString()); log.info(shardingContext.toString()); log.info("-----任务执行完成-----" + LocalTime.now().toString()); } }

    创建一个简单的任务,实现SimpleJob接口,execute为具体执行的业务逻辑

    3.3 对该任务进行配置

    @Bean(initMethod = "init") public JobScheduler jobScheduler(SimpleJob simpleJob, ZookeeperRegistryCenter regCenter) { SpringJobScheduler springJobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration()); return springJobScheduler; } public LiteJobConfiguration liteJobConfiguration() { JobCoreConfiguration.Builder jobConfig0 = JobCoreConfiguration.newBuilder("jobParameter1", "0/10 * * * * ?", 3).shardingItemParameters("0=a,1=b,2=c").jobParameter("param"); SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobConfig0.build(), SimpleJobTest.class.getCanonicalName()); LiteJobConfiguration.Builder builder = LiteJobConfiguration.newBuilder(simpleJobConfiguration); return builder.build(); }

    配置任务的时候,这里定义了四个参数,分别是: cron :cron表达式,用于控制作业触发时间。 shardingTotalCount :作业分片总数 shardingItemParameters :分片序列号和参数用等号分隔,多个键值对用逗号分隔分片序列号从0开始,不可大于或等于作业分片总数 如:0=a,1=b,2=c jobParameters :作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业

    例:每次获取的数据量、作业实例从数据库读取的主键等 更多job相关的参数详见附录。

    3.4 执行试一试

    启动项目时,会看到如下运行状态

    Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally. NOT STARTED. Currently in standby mode. Number of jobs executed: 0 Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 1 threads. Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.

    启动后,会看到我们执行的内容,和我们设置的cron一样,10s执行一次。由于设置了分片数为3,意味着将会有三个线程同时执行jobParameter1任务。 唯一区别在于他们的参数shardingItem及shardingParameter有所不同。我们即可更加这些参数进行相应的逻辑处理。比如根据这些参数在做分页区分,快速跑完数据。

    [jobParameter1-3] c.example.elasticjob.job0.SimpleJobTest : -----任务开始执行-----17:35:20.022 [jobParameter1-7] c.example.elasticjob.job0.SimpleJobTest : -----任务开始执行-----17:35:20.022 [jobParameter1-8] c.example.elasticjob.job0.SimpleJobTest : -----任务开始执行-----17:35:20.022 [jobParameter1-7] c.example.elasticjob.job0.SimpleJobTest : ShardingContext(jobName=jobParameter1, taskId=jobParameter1@-@0,1,2@-@READY@-@192.168.1.34@-@3188, shardingTotalCount=3, jobParameter=param, shardingItem=0, shardingParameter=a) [jobParameter1-3] c.example.elasticjob.job0.SimpleJobTest : ShardingContext(jobName=jobParameter1, taskId=jobParameter1@-@0,1,2@-@READY@-@192.168.1.34@-@3188, shardingTotalCount=3, jobParameter=param, shardingItem=2, shardingParameter=c) [jobParameter1-8] c.example.elasticjob.job0.SimpleJobTest : ShardingContext(jobName=jobParameter1, taskId=jobParameter1@-@0,1,2@-@READY@-@192.168.1.34@-@3188, shardingTotalCount=3, jobParameter=param, shardingItem=1, shardingParameter=b) [jobParameter1-7] c.example.elasticjob.job0.SimpleJobTest : -----任务执行完成-----17:35:20.022 [jobParameter1-8] c.example.elasticjob.job0.SimpleJobTest : -----任务执行完成-----17:35:20.022 [jobParameter1-3] c.example.elasticjob.job0.SimpleJobTest : -----任务执行完成-----17:35:20.022

    3.5 一些问题

    问题一:当你jobName相同,修改其它参数时,你会发现,重新启动应用执行并没有生效,依然保留原有参数。这是因为? 原因:zookeeper 注册中心会持久化这个分片相关信息。

    解决办法:overwrite设置为true即可,如下设置

    以上liteJobConfiguration配置中做如下修改

    LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true);

    问题二:如果启动两个相同任务名的job,并且SimpleJob实例就1个会怎样?

    代码上我们做如下调整,两个JobScheduler分别将param设置为param0,param1其它配置完全相同

    @Bean(initMethod = "init") public JobScheduler jobScheduler0(SimpleJob simpleJob, ZookeeperRegistryCenter regCenter) { SpringJobScheduler springJobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration("jobParameter0", "0/10 * * * * ?", 1,"param0")); return springJobScheduler; } @Bean(initMethod = "init") public JobScheduler jobScheduler1(SimpleJob simpleJob, ZookeeperRegistryCenter regCenter) { SpringJobScheduler springJobScheduler = new SpringJobScheduler(simpleJob, regCenter, liteJobConfiguration("jobParameter0", "0/10 * * * * ?", 1,"param1")); return springJobScheduler; }public LiteJobConfiguration liteJobConfiguration(String jobName, String cron, int shardingTotalCount,String param) { JobCoreConfiguration.Builder jobConfig0 = JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount) .jobParameter(param) .description("just a simpleJobTest"); SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobConfig0.build(), SimpleJobTest.class.getCanonicalName()); LiteJobConfiguration.Builder builder = LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true); return builder.build(); }

    此时你会发现,job执行最后被实例化的jobScheduler2的内容,如果将overwrite(true)去了,将会惊奇发现日志中打印为param0的数据,也就是jobScheduler2并没有如愿以偿的覆盖原有任务参数。 这个比较我们只能证overwrite(true)的作用,假如:

    问题三:如果两个不同任务名,并且SimpleJob实例就1个那会怎样? 在以上代码基础上将jobScheduler1中的任务名改成jobParameter1,其它不变,此时我们再来看看会是怎样

    --- [meter0_Worker-1] c.example.elasticjob.job0.SimpleJobTest : -----任务开始执行-----22:57:40.080 --- [meter1_Worker-1] c.example.elasticjob.job0.SimpleJobTest : -----任务开始执行-----22:57:40.080 --- [meter0_Worker-1] c.example.elasticjob.job0.SimpleJobTest : ShardingContext(jobName=jobParameter0, taskId=jobParameter0@-@0@-@READY@-@192.168.1.34@-@3407, shardingTotalCount=1, jobParameter=param0, shardingItem=0, shardingParameter=null) --- [meter1_Worker-1] c.example.elasticjob.job0.SimpleJobTest : ShardingContext(jobName=jobParameter1, taskId=jobParameter1@-@0@-@READY@-@192.168.1.34@-@3407, shardingTotalCount=1, jobParameter=param1, shardingItem=0, shardingParameter=null) --- [meter0_Worker-1] c.example.elasticjob.job0.SimpleJobTest : -----任务执行完成-----22:57:40.080 --- [meter1_Worker-1] c.example.elasticjob.job0.SimpleJobTest : -----任务执行完成-----22:57:40.080

    此时你会发现两个任务同时执行,也就说明了,任务名不同他们之间将会相互独立执行任务,就好像有两个SimpleJob实例一样。我们看看他的源码就知道是怎么一个回事了: 构造器

    private JobScheduler(final CoordinatorRegistryCenter regCenter, final LiteJobConfiguration liteJobConfig, final JobEventBus jobEventBus, final ElasticJobListener... elasticJobListeners) { JobRegistry.getInstance().addJobInstance(liteJobConfig.getJobName(), new JobInstance()); this.liteJobConfig = liteJobConfig; this.regCenter = regCenter; List elasticJobListenerList = Arrays.asList(elasticJobListeners); setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList); schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList); jobFacade = new LiteJobFacade(regCenter, liteJobConfig.getJobName(), Arrays.asList(elasticJobListeners), jobEventBus); }

    其中你会看到一个addJobInstance方法参数分别是我们设置的JobName和SimpleJob实例,再点进去看看

    /** * 添加作业实例. */ public void addJobInstance(final String jobName, final JobInstance jobInstance) { jobInstanceMap.put(jobName, jobInstance); }

    可以看出有个map,以我们的jobName做key,Value为我们的Job实例。我想到这里你也明白了为什么会出现以上想象,并不是有两个实例,而是相当于同一时间有两个执行了我们的SimpleJob。

    3.6 可能遇到问题

    一次将写好测试过的相关Task代码移动另一个包下,发现运行就出差了,如下异常

    Error creating bean with name ‘jobScheduler’ defined in class path resource [com/example/demo/testelasticjob/config/ConfigBean.class]: Invocation of init method failed; nested exception is com.dangdang.ddframe.job.exception.JobConfigurationException: Job conflict with register center. The job ‘jobParameter0’ in register center’s class is ‘com.example.elasticjob.job0.SimpleJobTest’, your job class is 'com.example.demo.testelasticjob.job.SimpleJobTest’

    从错误中我们看出因为移动之后job ‘jobParameter0 ’所注册的SimpleJobTest所对应的包与现有不一致导致的该错误。但是奇怪的是重启应用依然无济于事,将overwrite设置为true依然如此,这是为什么,那该怎么办?

    了解一下作业启动做了什么 elastic job通过下面代码启动作业: new JobScheduler(createRegistryCenter(), createJobConfiguration()).init(); 首先会更新作业的配置 schedulerFacade.updateJobConfiguration(liteJobConfig); 该方法会做两件事情: 1、当前实例作业配置持久化到zookeeper; 2、从zookeeper重新加载作业配置覆盖当前实例作业配置。 以上问题即在持久化配置时会进行作业配置的校验,发现已经注册中的jobParameter0的配置与当前所要更新的obParameter0不一致,如以上SimpleJobTest包名不一致,而抛出上面所示的一次。

    解决办法: 1.修改job名称如’jobParameter0’ 改为’jobParameterA’ 2.在Elastic-Job-Lite-console的控制台中进行删除或修改操作

    也许你有一大堆数据要处理,比如将一些关系型数据库中的大量数据导入到非关系型数据库中,那该怎么?用simple略显得麻烦些,那么可以考虑一些下面的?DataflowJob。

    4. DataflowJob

    通常用于不间歇性的数据处理任务。由于作业需要重新分片,所以不适合继续流式数据处理。 特定在于不断的加载数据,并且不断处理数据,直到数据为空 或者 作业不适合继续运行。思路参照了TbSchedule设计方式。 如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。 当作业配置不设置流式处理数据(DataflowJobConfiguration.streamingProcess = false) 时,调用 oneOffExecute()方法一次加载数据,一次处理数据。

    4.1dataflowJob实例

    业务实现包含两个接口:抓取(fetchData)和处理(processData)数据

    流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业 创建一个DataflowJobTest实现DataflowJob接口,如下实现

    @Component public class DataflowJobTest implements DataflowJob { private int count;@Override public List fetchData(ShardingContext shardingContext) { List datas = new ArrayList<>(); for (int i = 0; i < RandomUtils.nextInt(1, 10); i++) { datas.add(MessageFormat.format("jobName:{0},data:{1}", shardingContext.getJobName(), count++)); } System.out.println("fetchData ok"); return datas; }@Override public void processData(ShardingContext shardingContext, List data) { System.out.println("deal data:" + data); } }

    内容很简单,fetchData负责生产我们所要处理的数据;fetchData用于消费我们的数据。 示例中每次进行fechData时会往datas中筛入随机长度的数据,并用count标记每一条数据。fetchData中打印我们要处理的数据。

    4.2 配置

    到此dataflow的一个任务的主要内容就OK了,当然不要忘记和simpleJob一样去配置我们的任务

    /******* dataflowJob ******/@Bean(initMethod = "init") public JobScheduler dataflowJobScheduler(DataflowJob dataflowJob, ZookeeperRegistryCenter regCenter) { SpringJobScheduler springJobScheduler = new SpringJobScheduler(dataflowJob, regCenter, dataflowJobConfiguration("dataflowJob", "0/10 * * * * ?", 1, "param")); return springJobScheduler; }public LiteJobConfiguration dataflowJobConfiguration(String jobName, String cron, int shardingTotalCount, String param) { JobCoreConfiguration.Builder jobConfig = JobCoreConfiguration.newBuilder(jobName, cron, shardingTotalCount) .jobParameter(param) .description("just a dataflowJobTest"); DataflowJobConfiguration dataflowJobConfiguration = new DataflowJobConfiguration(jobConfig.build(), DataflowJobTest.class.getCanonicalName(),true); LiteJobConfiguration.Builder builder = LiteJobConfiguration.newBuilder(dataflowJobConfiguration).overwrite(true); return builder.build(); }

    基本上与SimpleJob的任务配置相同,唯一区别在于需要使用DataflowJobConfiguration。 特别需要注意DataflowJobConfiguration中第三个参数streamingProcess,该参数用于是否需要进行流处理,说白点就是是否需要不断生产数据并消费数据。

    4.3 执行看看

    1. streamingProcess=true时

    fetchData ok deal data:[jobName:dataflowJobTest0,data:46, jobName:dataflowJobTest0,data:47] fetchData ok deal data:[jobName:dataflowJobTest0,data:48, jobName:dataflowJobTest0,data:49, jobName:dataflowJobTest0,data:50] fetchData ok deal data:[jobName:dataflowJobTest0,data:51, jobName:dataflowJobTest0,data:52, jobName:dataflowJobTest0,data:53, jobName:dataflowJobTest0,data:54, jobName:dataflowJobTest0,data:55, jobName:dataflowJobTest0,data:56] fetchData ok deal data:[jobName:dataflowJobTest0,data:57, jobName:dataflowJobTest0,data:58] fetchData ok deal data:[jobName:dataflowJobTest0,data:59] fetchData ok deal data:[jobName:dataflowJobTest0,data:60, jobName:dataflowJobTest0,data:61, jobName:dataflowJobTest0,data:62] fetchData ok

    从日志上看,任务fetchData与processData会循环执行一直执行,我们可以从count标志位看出并没有出现并发问题,可见每次只会在同一线程中执行一次fetchData-processData 也就是循环

    问题一:fetchData-processData真的是循环执行吗? 带着疑问看看代码就行知道,进入DataflowJobExecutor类,可以显眼的看到process方法

    @Override protected void process(final ShardingContext shardingContext) { DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration) getJobRootConfig().getTypeConfig(); if (dataflowConfig.isStreamingProcess()) { streamingExecute(shardingContext); } else { oneOffExecute(shardingContext); } }

    streamingExecute与oneOffExecute分别用于流处理与单条处理,继续往下看streamingExecute实现

    private void streamingExecute(final ShardingContext shardingContext) { List data = fetchData(shardingContext); while (null != data && !data.isEmpty()) { processData(shardingContext, data); if (!getJobFacade().isEligibleForJobRunning()) { break; } data = fetchData(shardingContext); } } private List fetchData(final ShardingContext shardingContext) { return dataflowJob.fetchData(shardingContext); }

    从代码中while (null != data && !data.isEmpty())看出只要每次有fetch到数据,就会不间断的执行下去,知道抓取的数据为空为止,可以将示例代码中RandomUtils.nextInt(1, 10)改成RandomUtils.nextInt(0, 10)试试就知道了,这里不再演示。

    问题二:你会问那设置的cron定时有什么用? 当进行fetch到数据为空时,该次任务停止了,只有等待cron所设置的下一次定时重新激活该任务。

    2. streamingProcess=false时 日志情况

    fetchData ok data-long:4 deal data:[jobName:dataflowJobTest0,data:0, jobName:dataflowJobTest0,data:1, jobName:dataflowJobTest0,data:2, jobName:dataflowJobTest0,data:3]

    你会发现每次任务fetchData与processData只会执行一次,正如源码所写。

    private void oneOffExecute(final ShardingContext shardingContext) { List data = fetchData(shardingContext); if (null != data && !data.isEmpty()) { processData(shardingContext, data); } }

    也许有些重复的工作不太适合适应java代码进行重新,比如我要每10s定时查看一些服务器上的tomcat、mysql、mongodb等等进程的情况,那该怎么办?适应java代码略显麻烦,那么可以使用下面的?ScripJob

    5. ScripJob

    Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。代码中配置scriptCommandLine参数指定所要执行的脚本,无需编码。 Job的信息会以JSON 格式的数据作为参数传递。 ScripJob是什么?说明白点,就是定时执行你的脚本。

    5.1 实例

    创建ScripJob的执行任务,其实很简单,我们只需要做好配置即可,如下所示:

    @Bean(initMethod = "init") public JobScheduler scriptJobScheduler(ZookeeperRegistryCenter regCenter) { return new JobScheduler(regCenter, scriptJobConfiguration()); }public LiteJobConfiguration scriptJobConfiguration() { JobCoreConfiguration.Builder jobConfig0 = JobCoreConfiguration.newBuilder("jobParameterC", "0/10 * * * * ?", 1) .jobParameter("param1") .description("just a scriptJobTest"); // 指定脚本 ScriptJobConfiguration jobConfiguration = new ScriptJobConfiguration(jobConfig0.build(), "/Users/hgspiece" + "/Documents/job.sh");//"echo \"hello job\"" LiteJobConfiguration.Builder builder = LiteJobConfiguration.newBuilder(jobConfiguration).overwrite(true); return builder.build(); }

    从代码上看与SimpleJob的配置基本相似,区别在于ScripJob需要使用到ScriptJobConfiguration的配置,并且不需要有具体job实现类,但是需要指定执行的脚本。scriptCommandLine参数指定脚本。

    友情提醒:所指定的脚本可以是一窜指令 也可以是脚本的所在目录

    当scriptCommandLine为内容是如"echo \"hello job\""此时job执行后可以看到如下打印内容

    "hello job" {"jobName":"jobParameterC","taskId":"jobParameterC@-@0@-@READY@-@192.168.1.63@-@8298","shardingTotalCount":1,"jobParameter":"param1","shardingItem":0}

    可见job仅仅输出了hello job,还以json格式打印了我们所配置的job信息。

    一些问题

    问题一:如果换成shell脚本文件会怎样? 这里shell文件内容如下,一段非常简单的内容,打印当前时间(job.sh)

    echo "job start..." echo "当前时间:"date echo "job end..."

    job执行后台可以看到如下内容:

    job start... 当前时间:Wed May 22 22:52:00 CST 2019 job end...

    job算是正常执行了,但你会发现json格式的配置信息没有再打印了,我想这是必然。

    问题二:我们没有在shell中显示job信息,那如何获得job信息呢?

    其实他会作为参数传递给shell,我们只需要去第一个参数即可,job.sh如下改动

    echo "job start..." echo "job param:"$1 echo "当前时间:"date echo "job end..."

    再看看打印内容,此时得到我们的job配置信息

    job start... job param:{"jobName":"jobParameterC","taskId":"jobParameterC@-@0@-@READY@-@192.168.1.63@-@8335","shardingTotalCount":1,"jobParameter":"param1","shardingItem":0} 当前时间:Wed May 22 23:25:10 CST 2019 job end...

    6. 可视化任务管理

    随着任务不断增多,更好任务管理变得更加的重要,如果还是在代码上进行操作修改将变得异常麻烦。 这里将引入可视化任务管理工具:elastic-job-lite-console 可以直接从我上传的地方下载:https://download.csdn.net/download/it_faquir/11206134 也可以通过编译源码方式打包,略显麻烦。

    6.1安装console

    到github上将源码下载下载:https://github.com/elasticjob/elastic-job-lite

    完成后解压,通过maven指令进行打包:mvn package -Dmaven.test.skip=true

    注意: 打包前最好看看项目中pom文件指定的sping版本和maven版本,它是设置成范围了,导致你打包超慢,下载一大堆多余的Sping不同版本的包,只需要改成你本地常用的版本即可。作者这点没说,有点坑。

    打包完成后,在../elastic-job-lite-dev/elastic-job-lite-console/target 会看到console打包好的jar以及一个elastic-job-lite-console-3.0.0.M1-SNAPSHOT.tar.gz包 这个tar.gz包则是我们所需要的,将其解压后可以看到以下目录 此时只需要执行shart.sh/start.bat即可。默认端口号为8899 启动好后,可通过http://127.0.0.1:8899访问试试 默认情况console提供了管理员和游客两个身份密码:

    root.username=root root.password=root guest.username=guest guest.password=guest

    如果需要修改密码可以在启动之前修改conf/auth.properties文件的内容。 当你访问后发现之前所配置的一些任务都没有,这里要知道elastic-job-lite-console是完全独立的公众平台,与我们的任务业务没有紧耦合。因此我们可以在 全局配置 》注册中心配置 添加我们的配置中心信息 完成后,点击相应的连接按钮即可,再回到作业操作 》作业维度 即可看到我们的任务了 包括了作业的增删改查操作等等 具体就不在细说了,自己动手试试吧。

    OK,elasticJob的主要使用就讲完了,希望对大家有所帮助。

    附录:

    config节点 作业配置信息,以JSON格式存储

    JSON属性名描述jobName作业名称,为Elastic-Job唯一标识jobClass作业实现类名称jobType作业类型,目前支持SIMPLE,DATAFLOW和SCRIPT三种类型cron作业启动时间的cron表达式shardingTotalCount作业分片总数,修改会导致运行中作业重新分片shardingItemParameters分片序列号和个性化参数对照表jobParameter作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业 例:每次获取的数据量、作业实例从数据库读取的主键等failover是否开启失效转移,仅monitorExecution开启时才起作用misfire是否开启错过任务重新执行description作业描述信息disabled作业服务器状态是否禁用,可用于部署作业时,先禁止启动,部署结束后统一启动monitorExecution监控作业执行时状态maxTimeDiffSeconds允许的本机与注册中心的时间误差秒数monitorPort使用dump命令的端口,为-1则表示不开启端口streamingProcess是否流式处理数据,如果流式处理数据,则fetchData不返回空结果将持续执行作业,如果非流式处理数据, 则处理数据完成后作业结束jobShardingStrategyClass作业分片策略类的全路径scriptCommandLineSCRIPT型作业作业执行命令行jobEventConfigs作业执行事件追踪,可配置log,rdb两种方式,rdb方式需配置driverClassName,url,username,password,logLeveljobProperties作业定制化属性,目前支持job_exception_handler和executor_service_handler,用于扩展异常处理和自定义作业处理线程池

    servers节点 作业服务器信息,子节点是作业服务器的IP地址。IP地址节点的子节点存储详细信息。同一台作业服务器只能运行一个相同的作业实例,因为作业运行时是按照IP注册和管理的。

    子节点名临时节点描述hostName否作业服务器名称status是作业服务器状态,分为READY和RUNNING,用于表示服务器在等待执行作业还是正在执行作业,如果status节点不存在则表示作业服务器未上线disabled否作业服务器状态是否禁用,可用于部署作业时,先禁止启动,部署结束后统一启动sharding否该作业服务器分到的作业分片项,多个分片项用逗号分隔,如:0, 1, 2代表该服务器执行第1, 2, 3片分片paused否暂停作业的标记,暂停的作业不会终止调度器运行。作业程序再次启动时不会清理此标记shutdown否关闭作业的标记,关闭的作业将停止调度,并可通过控制台删除。只有作业程序再次启动时才会清理此标记trigger否立刻触发作业的标记,作业在不与上次运行中作业冲突的情况下将立刻启动,并在启动后自动清理此标记

    execution节点 执行时信息,子节点是分片项序号,从零开始,至分片总数减一。分片项序号的子节点存储详细信息。可通过配置config\monitorExecution为false关闭记录作业执行时信息。

    子节点名临时节点描述running是分片项正在运行的状态,如果没有此节点,并且没有completed节点,表示该分片未运行completed否分片项运行完成的状态,下次作业开始执行时会清理failover是如果该分片项被失效转移分配给其他作业服务器,则此节点值记录执行此分片的作业服务器IPlastBeginTime否该分片项最近一次的开始执行时间nextFireTime否该分片项下次作业触发时间lastCompleteTime否该分片项最近一次的结束执行时间misfire否是否开启错过任务重新执行

    leader节点 作业服务器主节点信息,分为election,sharding和execution三个子节点。分别用于主节点选举,分片和作业执行时处理。 leader节点是内部使用的节点,如果对作业框架原理不感兴趣,可不关注此节点。

    子节点名临时节点描述election\host是主节点服务器IP地址一旦该节点被删除将会触发重新选举,重新选举的过程中一切主节点相关的操作都将阻塞election\latch否主节点选举的分布式锁为curator的分布式锁使用sharding\necessary否是否需要重新分片的标记,如果分片总数变化,或作业服务器节点上下线或启用/禁用,以及主节点选举,会触发设置重分片标记作业在下次执行时使用主节点重新分片,且中间不会被打断作业执行时不会触发分片sharding\processing是主节点在分片时持有的节点,如果有此节点,所有的作业执行都将阻塞,直至分片结束主节点分片结束或主节点崩溃会删除此临时节点execution\necessary否是否需要修正作业执行时分片项信息的标记,如果分片总数变化,会触发设置修正分片项信息标记,作业在下次执行时会增加或减少分片项数量execution\cleaning是主节点在清理上次作业运行时状态时所持有的节点,每次开始新作业都需要清理上次运行完成的作业信息,如果有此节点,所有的作业执行都将阻塞,直至清理结束,主节点分片结束或主节点崩溃会删除此临时节点failover\items\分片项否一旦有作业崩溃,则会向此节点记录,当有空闲作业服务器时,会从此节点抓取需失效转移的作业项failover\items\latch否分配失效转移分片项时占用的分布式锁为curator的分布式锁使用
    最新回复(0)