Categories
程式開發

整合Elastic-Job(支持動態任務)


最近公司的項目需要用到分佈式任務調度,在結合多款開源框架後決定使用噹噹的Elastic-job。 不知道大家有沒有這樣的需求,就是動態任務。 之前比較了xxl-job和elastic-job發現,都只是支持註解或者配置以及後台添加現有的任務,不支持動態添加。 比如:類似訂單半小時後自動取消的場景。

xxl-job理論上來說是可以支持的,但是需要高度整合admin端的程序,然後開放對應的接口才可以給其他服務調用,這樣本質直接改源碼對後期的升級十分不便,最後放棄了xxl-job。 elastic-job在移交Apache後的版本規劃中,有提到API的開放,但是目前還沒有穩定版,所以只能使用之前的2.1.5的版本來做。 在Github搜了很多整合方案,最後決定選擇下面的來實現。


com.github.xjzrc.spring.boot
elastic-job-lite-spring-boot-starter
${lasted.release.version}

整合Elastic-Job(支持動態任務) 1

因為要做的是動態的,所以這裡沒有直接使用maven坐標引入,直接將源碼全部接入項目來使用,這樣比較靈活,因為底層本質上還是用elastic-job的東西。 下面引入elastic-job坐標


com.dangdang
elastic-job-lite-spring
${elastic-job.version}

com.dangdang
elastic-job-lite-lifecycle
${elastic-job.version}

org.eclipse.jetty.aggregate
jetty-all-server

org.apache.curator
curator-framework

org.apache.curator
curator-recipes

在整合完上面的源碼後,就可以直接支持配置式的定時任務了,只需要修改服務的config即可生效,但是要做到動態式的添加和刪除就必須在實現一個動態的實現。

首先在ElasticJobAutoConfiguration新增一個Bean

/**
* 动态任务初始化
* @return
*/
@Bean(initMethod = "init")
@ConditionalOnMissingBean
public DynamicJobInitialization dynamicJobInitialization() {
return new DynamicJobInitialization(this.regCenter());
}

然後實現動態的類

/**
* 动态任务初始化(支持简单、流式任务)
* @author Zzq
* @date 2020/9/14 19:22
*/
@Slf4j
public class DynamicJobInitialization extends AbstractJobInitialization {

private JobStatisticsAPI jobStatisticsAPI;
private JobSettingsAPI jobSettingsAPI;

public DynamicJobInitialization(ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.jobStatisticsAPI = new JobStatisticsAPIImpl(zookeeperRegistryCenter);
this.jobSettingsAPI = new JobSettingsAPIImpl(zookeeperRegistryCenter);
}

public void init() {
Collection allJob = jobStatisticsAPI.getAllJobsBriefInfo();
if (CollUtil.isNotEmpty(allJob)) {
allJob.forEach(jobInfo -> {
// 已下线的任务
if (JobBriefInfo.JobStatus.CRASHED.equals(jobInfo.getStatus())) {
try {
Date currentDate = new Date();
CronExpression cronExpression = new CronExpression(jobInfo.getCron());
Date nextValidTimeAfter = cronExpression.getNextValidTimeAfter(currentDate);
// 表达式还生效的任务
if (ObjectUtil.isNotNull(nextValidTimeAfter)) {
this.initJobHandler(jobInfo.getJobName());
}
} catch (ParseException e) {
log.error(e.getMessage(), e);
}
}
});
}
}

/**
* 初始化任务操作
* @param jobName 任务名
*/
private void initJobHandler(String jobName) {
try {
JobSettings jobSetting = jobSettingsAPI.getJobSettings(jobName);
if (ObjectUtil.isNotNull(jobSetting)) {
String jobCode = StrUtil.subBefore(jobSetting.getJobName(), StrUtil.UNDERLINE, false);
JobClassEnum jobClassEnum = JobClassEnum.convert(jobCode);
if (ObjectUtil.isNotNull(jobClassEnum)) {
ElasticJobProperties.JobConfiguration configuration = new ElasticJobProperties.JobConfiguration();
configuration.setCron(jobSetting.getCron());
configuration.setJobParameter(jobSetting.getJobParameter());
configuration.setShardingTotalCount(jobSetting.getShardingTotalCount());
configuration.setDescription(jobSetting.getDescription());
configuration.setShardingItemParameters(jobSetting.getShardingItemParameters());
configuration.setJobClass(jobClassEnum.getClazz().getCanonicalName());
super.initJob(jobName, JobType.valueOf(jobSetting.getJobType()), configuration);
}
}
} catch (Exception e) {
log.error("初始化任务操作失败: {}", e.getMessage(), e);
}
}

/**
* 保存/更新任务
* @param job
* @param jobClass
*/
public void addOrUpdateJob(Job job, Class jobClass) {
ElasticJobProperties.JobConfiguration configuration = new ElasticJobProperties.JobConfiguration();
configuration.setCron(job.getCron());
configuration.setJobParameter(job.getJobParameter());
configuration.setShardingTotalCount(job.getShardingTotalCount());
configuration.setShardingItemParameters(job.getShardingItemParameters());
configuration.setJobClass(jobClass.getCanonicalName());
super.initJob(job.getJobName(), JobType.valueOf(job.getJobType()), configuration);
}

@Override
public JobTypeConfiguration getJobTypeConfiguration(String jobName, JobType jobType, JobCoreConfiguration jobCoreConfiguration) {
String jobCode = StrUtil.subBefore(jobName, StrUtil.UNDERLINE, false);
JobClassEnum jobClassEnum = JobClassEnum.convert(jobCode);
if (ObjectUtil.isNotNull(jobClassEnum)) {
if (JobType.SIMPLE.equals(jobType)) {
return new SimpleJobConfiguration(jobCoreConfiguration, jobClassEnum.getClazz().getCanonicalName());
} else if (JobType.DATAFLOW.equals(jobType)) {
return new DataflowJobConfiguration(jobCoreConfiguration, jobClassEnum.getClazz().getCanonicalName(), false);
}
}
return null;
}
}

為什麼是這樣的實現? 我發現每次重新發布服務後,現在的未執行的任務都會變成“已下線”,這可能跟Zookeeper有關,需要重新初始化才行,對於註解和配置式的,會自動初始化,但是動態添加的不會自動初始化。 所以必須自己初始化,之前有個思路是自己建張表來維護定時,每次啟動時進行初始化,但是這樣太麻煩,後來實現使用elastic-job現有的API來實現,即啟動時,遍歷Zookeeper已有的節點,然後判斷Cron表達式是否過期,如果還沒有過期,則重新初始化任務,初始化時配置設置了會覆蓋原來的配置,所以不會有影響。 然後外層可以通過MQ來新增任務,在通過服務調用去指定對應的定時邏輯即可。

(不知道大家有沒有更好的實現方案,可以初始化動態任務的)

而配置式的,可以直接在配置文件指定並實現即可

spring:
elasticjob:
#注册中心配置
zookeeper:
server-lists: 127.0.0.1:6181
namespace: elastic-job-spring-boot-stater-demo
#简单作业配置
simples:
#spring简单作业示例配置
spring-simple-job:
#配置简单作业,必须实现com.dangdang.ddframe.job.api.simple.SimpleJob
job-class: com.zen.spring.boot.demo.elasticjob.job.SpringSimpleJob
cron: 0/2 * * * * ?
sharding-total-count: 3
sharding-item-parameters: 0=Beijing,1=Shanghai,2=Guangzhou
#配置监听器
listener:
#配置每台作业节点均执行的监听,必须实现com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener
listener-class: com.zen.spring.boot.demo.elasticjob.listener.MyElasticJobListener
#流式作业配置
dataflows:
#spring简单作业示例配置
spring-dataflow-job:
#配置简单作业,必须实现com.dangdang.ddframe.job.api.dataflow.DataflowJob
job-class: com.zen.spring.boot.demo.elasticjob.job.SpringDataflowJob
cron: 0/2 * * * * ?
sharding-total-count: 3
sharding-item-parameters: 0=Beijing,1=Shanghai,2=Guangzhou
streaming-process: true
#配置监听器
listener:
#配置分布式场景中仅单一节点执行的监听,必须实现com.dangdang.ddframe.job.lite.api.listener.AbstractDistributeOnceElasticJobListener
distributed-listener-class: com.zen.spring.boot.demo.elasticjob.listener.MyDistributeElasticJobListener
started-timeout-milliseconds: 5000
completed-timeout-milliseconds: 10000
#脚本作业配置
scripts:
#脚本作业示例配置
script-job:
cron: 0/2 * * * * ?
sharding-total-count: 3
sharding-item-parameters: 0=Beijing,1=Shanghai,2=Guangzhou
script-command-line: youPath/spring-boot-starter-demo/elastic-job-spring-boot-starter-demo/src/main/resources/script/demo.bat

以上整合基本可以滿足現在的使用,比較期待移交Apache後的3的版本,這樣可以有更多API的支持,而不用自己造輪子。