分布式调度框架架构实战:Quartz、XXL-Job、PowerJob企业级任务调度完整解决方案

一、调度框架概述

1.1 调度框架定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
调度框架核心概念:
定时任务:
- 按时间计划执行任务
- Cron表达式定义执行时间
- 一次性或周期性执行

分布式调度:
- 多节点部署
- 任务分片执行
- 高可用保障
- 动态任务管理

应用场景:
- 数据同步
- 报表生成
- 定时清理
- 数据统计
- 消息推送

1.2 常见调度框架对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
调度框架对比:
Quartz:
- 优点: 功能强大,成熟稳定
- 缺点: 配置复杂,无web界面
- 适用: 企业级应用
- 集群: 支持数据库集群

XXL-Job:
- 优点: 简单易用,Web管理界面
- 缺点: 性能一般
- 适用: 中小型企业
- 集群: 支持调度中心+执行器

Elastic-Job:
- 优点: 基于Elasticsearch,动态分片
- 缺点: 已停止更新
- 适用: 分片任务场景
- 集群: 支持Zookeeper集群

PowerJob:
- 优点: 高性能,功能丰富
- 缺点: 相对较新
- 适用: 大规模调度
- 集群: 支持Server+Worker架构

Spring Task:
- 优点: 简单轻量,Spring集成
- 缺点: 不支持分布式
- 适用: 单机场景
- 集群: 不支持

二、Quartz分布式调度

2.1 Quartz基础配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<!-- pom.xml -->
<dependencies>
<!-- Quartz -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.2</version>
</dependency>

<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.3.2</version>
</dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// QuartzConfig.java
@Configuration
public class QuartzConfig {

@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();

// 数据源配置
factory.setDataSource(dataSource);

// 事务管理器
factory.setTransactionManager(transactionManager);

// Job存储配置
Properties props = new Properties();
props.put("org.quartz.scheduler.instanceName", "MyScheduler");
props.put("org.quartz.scheduler.instanceId", "AUTO");

// JobStore配置(集群模式)
props.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
props.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
props.put("org.quartz.jobStore.isClustered", "true");
props.put("org.quartz.jobStore.clusterCheckinInterval", "20000");

// 线程池配置
props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
props.put("org.quartz.threadPool.threadCount", "10");
props.put("org.quartz.threadPool.threadPriority", "5");

factory.setQuartzProperties(props);

return factory;
}

@Bean
public Scheduler scheduler(SchedulerFactoryBean factory) throws Exception {
return factory.getScheduler();
}
}

2.2 创建Job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// SimpleJob.java
public class SimpleJob implements Job {

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("执行定时任务: " + new Date());

// 获取JobDataMap
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
String param = dataMap.getString("param");

// 执行业务逻辑
doBusiness(param);
}

private void doBusiness(String param) {
// 业务逻辑
System.out.println("处理业务: " + param);
}
}

// SpringBootJob.java
@Component
public class SpringBootJob implements Job {

@Autowired
private UserService userService;

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
// 可以使用注入的服务
List<User> users = userService.getAllUsers();
System.out.println("用户数量: " + users.size());
}
}

2.3 调度Job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// JobScheduler.java
@Service
public class JobScheduler {

@Autowired
private Scheduler scheduler;

/**
* 创建简单调度任务
*/
public void scheduleSimpleJob() throws SchedulerException {
// 定义Job
JobDetail jobDetail = JobBuilder.newJob(SimpleJob.class)
.withIdentity("simpleJob", "group1")
.withDescription("简单定时任务")
.usingJobData("param", "value")
.build();

// 定义Trigger(每5秒执行一次)
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("simpleTrigger", "group1")
.startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(5)
.repeatForever())
.build();

// 调度任务
scheduler.scheduleJob(jobDetail, trigger);
}

/**
* 创建Cron调度任务
*/
public void scheduleCronJob() throws SchedulerException {
JobDetail jobDetail = JobBuilder.newJob(SimpleJob.class)
.withIdentity("cronJob", "group2")
.build();

// 每天凌晨2点执行
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("cronTrigger", "group2")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0 2 * * ?"))
.build();

scheduler.scheduleJob(jobDetail, trigger);
}

/**
* 动态创建任务
*/
public void addJob(String jobName, String groupName, String cron) throws SchedulerException {
JobDetail jobDetail = JobBuilder.newJob(SimpleJob.class)
.withIdentity(jobName, groupName)
.build();

CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(jobName + "Trigger", groupName)
.withSchedule(CronScheduleBuilder.cronSchedule(cron))
.build();

scheduler.scheduleJob(jobDetail, trigger);
}

/**
* 删除任务
*/
public void deleteJob(String jobName, String groupName) throws SchedulerException {
scheduler.deleteJob(new JobKey(jobName, groupName));
}

/**
* 暂停任务
*/
public void pauseJob(String jobName, String groupName) throws SchedulerException {
scheduler.pauseJob(new JobKey(jobName, groupName));
}

/**
* 恢复任务
*/
public void resumeJob(String jobName, String groupName) throws SchedulerException {
scheduler.resumeJob(new JobKey(jobName, groupName));
}

/**
* 立即执行任务
*/
public void triggerJob(String jobName, String groupName) throws SchedulerException {
scheduler.triggerJob(new JobKey(jobName, groupName));
}
}

三、XXL-Job分布式调度

3.1 XXL-Job配置

1
2
3
4
5
6
7
8
<!-- pom.xml -->
<dependencies>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
# application.yml
xxl:
job:
admin:
addresses: http://localhost:8080/xxl-job-admin
executor:
appname: xxl-job-executor-sample
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30
accessToken:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// XxlJobConfig.java
@Configuration
public class XxlJobConfig {

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.executor.appname}")
private String appname;

@Value("${xxl.job.executor.logpath}")
private String logPath;

@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;

@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}

3.2 实现JobHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// SampleJobHandler.java
@Component
public class SampleJobHandler {

/**
* 简单任务
*/
@XxlJob("sampleJob")
public void sampleJob() {
System.out.println("执行简单任务: " + new Date());
}

/**
* 分片任务
*/
@XxlJob("shardingJob")
public void shardingJob() {
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

System.out.println("分片索引: " + shardIndex + ", 总分片: " + shardTotal);

// 处理分片数据
List<String> dataList = getDataList();

for (int i = shardIndex; i < dataList.size(); i += shardTotal) {
String data = dataList.get(i);
processData(data);
}
}

/**
* 带参数的任务
*/
@XxlJob("paramJob")
public void paramJob() {
String param = XxlJobHelper.getJobParam();
System.out.println("任务参数: " + param);

// 使用参数执行任务
processWithParam(param);
}

/**
* 捕获异常的任务
*/
@XxlJob("exceptionJob")
public void exceptionJob() {
try {
// 执行业务逻辑
doBusiness();
XxlJobHelper.handleSuccess("任务执行成功");
} catch (Exception e) {
XxlJobHelper.handleFail("任务执行失败: " + e.getMessage());
}
}

private List<String> getDataList() {
// 获取数据列表
return Arrays.asList("data1", "data2", "data3", "data4", "data5");
}

private void processData(String data) {
// 处理数据
System.out.println("处理数据: " + data);
}

private void processWithParam(String param) {
// 使用参数处理
System.out.println("处理参数: " + param);
}

private void doBusiness() {
// 业务逻辑
}
}

四、PowerJob分布式调度

4.1 PowerJob配置

1
2
3
4
5
6
7
8
<!-- pom.xml -->
<dependencies>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>4.3.6</version>
</dependency>
</dependencies>
1
2
3
4
5
6
7
8
9
10
11
12
# application.yml
powerjob:
worker:
enabled: true
server-address: 127.0.0.1:7700
app-name: powerjob-worker-sample
port: 27777
# 网络配置
max-result-length: 4096000
# 处理器配置
max-lightweight-task-num: 1024
max-heavy-task-num: 64
1
2
3
4
5
6
7
8
9
// PowerJobConfig.java
@Configuration
public class PowerJobConfig {

@Bean
public PowerJobBanner banner() {
return new PowerJobBanner();
}
}

4.2 实现Processor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// SimpleProcessor.java
@Component
public class SimpleProcessor implements BasicProcessor {

@Override
public ProcessResult process(TaskContext context) throws Exception {
System.out.println("执行PowerJob任务");

String jobParams = context.getJobParams();
System.out.println("任务参数: " + jobParams);

// 执行业务逻辑
doBusiness(jobParams);

// 返回结果
return new ProcessResult(true, "任务执行成功");
}

private void doBusiness(String params) {
// 业务逻辑
System.out.println("处理业务: " + params);
}
}

// Processor注解方式
@Component
public class AnnotationProcessor {

@PowerJobHandler("annotationJob")
public void annotationJob(TaskContext context) {
System.out.println("执行注解任务");
}

@PowerJobHandler("mapProcessor")
@MapProcessor(maxSubTaskNum = 100)
public ProcessResult mapProcessor(TaskContext context) {
// Map-Reduce任务
String jobParams = context.getJobParams();

// 拆分任务
List<String> subTasks = splitTasks(jobParams);

// 返回子任务
return new ProcessResult(true, "准备执行子任务", subTasks);
}

private List<String> splitTasks(String params) {
// 拆分任务逻辑
return Arrays.asList("task1", "task2", "task3");
}
}

五、集群部署配置

5.1 Quartz集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# quartz.properties
# 调度器配置
org.quartz.scheduler.instanceName = MyScheduler
org.quartz.scheduler.instanceId = AUTO

# 集群配置
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.clusterCheckinInterval = 20000
org.quartz.jobStore.maxMisfiresToHandleAtATime = 1
org.quartz.jobStore.misfireThreshold = 120000

# 数据源配置
org.quartz.jobStore.dataSource = myDS
org.quartz.dataSource.myDS.driver = com.mysql.cj.jdbc.Driver
org.quartz.dataSource.myDS.URL = jdbc:mysql://localhost:3306/quartz?useUnicode=true&characterEncoding=utf8
org.quartz.dataSource.myDS.user = root
org.quartz.dataSource.myDS.password = password
org.quartz.dataSource.myDS.maxConnections = 10

5.2 XXL-Job集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 调度中心部署
// 1. 下载xxl-job-admin
// 2. 配置数据库
// 3. 启动调度中心

// 执行器配置
@Configuration
public class XxlJobClusterConfig {

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
XxlJobSpringExecutor executor = new XxlJobSpringExecutor();

// 调度中心地址
executor.setAdminAddresses(adminAddresses);

// 执行器配置
executor.setAppname("xxl-job-executor");
executor.setIp(""); // 自动获取
executor.setPort(9999); // 执行器端口
executor.setAccessToken(""); // 访问令牌
executor.setLogPath("/data/applogs/xxl-job");
executor.setLogRetentionDays(30);

return executor;
}
}

5.3 PowerJob集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Worker配置
powerjob:
worker:
app-name: powerjob-worker
server-address: 127.0.0.1:7700
# 端口配置
port: 27777
# 最大结果长度
max-result-length: 4096000
# 任务线程池
worker-omp-pool-size: 8
max-instance-num: 1024

# Server配置
powerjob:
server:
port: 7700
# 数据库配置
jdbc:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/powerjob-server?useUnicode=true&characterEncoding=utf8
username: root
password: password
# 配置中心
store-strategy: disk
store-strategy-disk:
backup-dir: /data/powerjob

六、高可用架构设计

6.1 调度中心高可用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
graph TB
A[调度中心1] -->|主| B[Nginx]
A -->|备| B
C[调度中心2] -->|备| B
C -->|主| B

B --> D[执行器集群]
D --> E[Worker 1]
D --> F[Worker 2]
D --> G[Worker 3]

E --> H[业务系统]
F --> H
G --> H

6.2 执行器高可用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 执行器高可用配置
@Configuration
public class ExecutorHAConfig {

/**
* 配置多台执行器
*/
@Bean
public XxlJobSpringExecutor executor1() {
XxlJobSpringExecutor executor = new XxlJobSpringExecutor();
executor.setAppname("xxl-job-executor-1");
executor.setAdminAddresses("http://scheduler:8080/xxl-job-admin");
executor.setPort(9999);
return executor;
}

@Bean
public XxlJobSpringExecutor executor2() {
XxlJobSpringExecutor executor = new XxlJobSpringExecutor();
executor.setAppname("xxl-job-executor-2");
executor.setAdminAddresses("http://scheduler:8080/xxl-job-admin");
executor.setPort(9998);
return executor;
}
}

七、任务分片实现

7.1 Quartz分片任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// ShardingJob.java
public class ShardingJob implements Job {

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
int shardIndex = context.getMergedJobDataMap().getInt("shardIndex");
int shardTotal = context.getMergedJobDataMap().getInt("shardTotal");

List<String> dataList = getDataList();

for (int i = shardIndex; i < dataList.size(); i += shardTotal) {
String data = dataList.get(i);
processData(data);
}
}

private List<String> getDataList() {
// 获取数据
return new ArrayList<>();
}

private void processData(String data) {
// 处理数据
}
}

// 分片调度
@Service
public class ShardScheduler {

@Autowired
private Scheduler scheduler;

public void scheduleShardJob(int shardTotal) throws SchedulerException {
for (int i = 0; i < shardTotal; i++) {
int shardIndex = i;

JobDetail jobDetail = JobBuilder.newJob(ShardingJob.class)
.withIdentity("shardJob-" + i, "group1")
.usingJobData("shardIndex", shardIndex)
.usingJobData("shardTotal", shardTotal)
.build();

Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("shardTrigger-" + i, "group1")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0/5 * * * ?"))
.build();

scheduler.scheduleJob(jobDetail, trigger);
}
}
}

7.2 XXL-Job分片任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// ShardingJobHandler.java
@Component
public class ShardingJobHandler {

@XxlJob("shardingJob")
public void shardingJob() {
// 获取分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

System.out.println("分片索引: " + shardIndex);
System.out.println("总分片: " + shardTotal);

// 处理分片数据
List<String> dataList = getDataList();

for (int i = shardIndex; i < dataList.size(); i += shardTotal) {
String data = dataList.get(i);
processData(data);
}
}

private List<String> getDataList() {
// 获取数据列表
return Arrays.asList("data1", "data2", "data3", "data4", "data5");
}

private void processData(String data) {
// 处理数据
System.out.println("处理: " + data);
}
}

八、监控与管理

8.1 任务监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// JobMonitor.java
@Component
public class JobMonitor {

@Autowired
private Scheduler scheduler;

/**
* 监控任务执行状态
*/
@Scheduled(fixedDelay = 5000)
public void monitorJobs() throws SchedulerException {
List<String> groupNames = scheduler.getJobGroupNames();

for (String groupName : groupNames) {
Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName));

for (JobKey jobKey : jobKeys) {
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);

for (Trigger trigger : triggers) {
TriggerState state = scheduler.getTriggerState(trigger.getKey());

System.out.println("Job: " + jobKey.getName() +
", State: " + state);
}
}
}
}
}

8.2 失败任务重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// RetryJob.java
@Component
public class RetryJob implements Job {

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
// 执行业务逻辑
doBusiness();

} catch (Exception e) {
// 判断是否需要重试
if (shouldRetry(context)) {
// 调度重试任务
scheduleRetry(context);
}

throw new JobExecutionException(e);
}
}

private boolean shouldRetry(JobExecutionContext context) {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
int retryCount = dataMap.getInt("retryCount");

return retryCount < 3; // 最多重试3次
}

private void scheduleRetry(JobExecutionContext context) {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
int retryCount = dataMap.getInt("retryCount");
dataMap.put("retryCount", retryCount + 1);
}

private void doBusiness() {
// 业务逻辑
}
}

九、最佳实践

9.1 实践建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
调度框架最佳实践:
1. 任务设计:
- 任务尽量轻量
- 避免长任务
- 支持幂等性
- 处理异常情况

2. 分片策略:
- 合理设置分片数
- 考虑数据分布
- 避免数据倾斜

3. 监控告警:
- 监控任务执行
- 告警失败任务
- 统计执行时间

4. 高可用:
- 多实例部署
- 负载均衡
- 故障转移

5. 性能优化:
- 避免并发冲突
- 优化数据库查询
- 合理使用缓存

9.2 选型建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
框架选型:
选择Quartz:
- 需要复杂调度逻辑
- 已有Spring集成
- 需要精细控制

选择XXL-Job:
- 需要Web管理界面
- 简单快速部署
- 中小型项目

选择PowerJob:
- 大规模调度需求
- 需要Map-Reduce
- 高性能要求

选择Elastic-Job:
- 需要动态分片
- 已有Zookeeper
- 复杂任务场景

十、总结

10.1 核心要点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
调度框架核心:
分布式部署:
- 调度中心集群
- 执行器集群
- 数据持久化

任务调度:
- Cron表达式
- 分片执行
- 动态管理

高可用:
- 故障转移
- 负载均衡
- 监控告警

性能优化:
- 合理分片
- 异步执行
- 资源隔离

10.2 实践建议

  1. 优先满足需求:按场景选择框架
  2. 监控优先:监控任务执行与异常
  3. 高可用部署:多实例与故障转移
  4. 性能优化:合理分片与异步处理
  5. 定期维护:清理失败任务与更新调度策略

选择合适的调度框架可提升系统可靠性。