1. XXL-JOB分片机制概述

XXL-JOB是一个分布式任务调度平台,其分片机制能够将大任务拆分成多个小任务,在多个执行器实例上并行执行,显著提升任务处理效率。本文将详细介绍XXL-JOB的分片机制、多实例并行处理策略和最佳实践。

1.1 分片核心概念

  1. 分片总数: 任务被分成的总片数
  2. 分片序号: 当前执行器处理的分片编号(从0开始)
  3. 执行器实例: 运行任务的多个服务实例
  4. 分片策略: 数据分片和任务分配策略

1.2 分片架构

1
2
3
4
5
XXL-JOB调度中心 → 分片任务分发 → 执行器实例1 → 分片0
↓ ↓
分片策略计算 → 执行器实例2 → 分片1
↓ ↓
任务状态同步 → 执行器实例3 → 分片2

2. XXL-JOB配置与部署

2.1 执行器配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* XXL-JOB执行器配置
*/
@Configuration
public class XxlJobConfig {

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.executor.appname}")
private String appname;

@Value("${xxl.job.executor.address}")
private String address;

@Value("${xxl.job.executor.ip}")
private String ip;

@Value("${xxl.job.executor.port}")
private int port;

@Value("${xxl.job.accessToken}")
private String accessToken;

@Value("${xxl.job.executor.logpath}")
private String logPath;

@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;

/**
* 执行器配置
*/
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

return xxlJobSpringExecutor;
}
}

2.2 应用配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# application.yml
xxl:
job:
admin:
addresses: http://localhost:8080/xxl-job-admin
executor:
appname: xxl-job-executor-sample
address:
ip:
port: 9999
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30
accessToken: default_token

3. 基础分片任务实现

3.1 简单分片任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* 简单分片任务示例
*/
@Component
public class SimpleShardingJob {

private static final Logger log = LoggerFactory.getLogger(SimpleShardingJob.class);

/**
* 分片任务执行
* @param shardIndex 分片序号
* @param shardTotal 分片总数
*/
@XxlJob("simpleShardingJob")
public void simpleShardingJob() {
// 获取分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

log.info("开始执行分片任务: shardIndex={}, shardTotal={}", shardIndex, shardTotal);

try {
// 根据分片参数处理数据
processDataByShard(shardIndex, shardTotal);

log.info("分片任务执行完成: shardIndex={}, shardTotal={}", shardIndex, shardTotal);

} catch (Exception e) {
log.error("分片任务执行失败: shardIndex={}, shardTotal={}", shardIndex, shardTotal, e);
XxlJobHelper.handleFail("分片任务执行失败: " + e.getMessage());
}
}

/**
* 根据分片处理数据
* @param shardIndex 分片序号
* @param shardTotal 分片总数
*/
private void processDataByShard(int shardIndex, int shardTotal) {
// 模拟数据列表
List<String> dataList = generateDataList();

// 计算当前分片需要处理的数据
List<String> shardData = calculateShardData(dataList, shardIndex, shardTotal);

log.info("分片{}处理数据量: {}", shardIndex, shardData.size());

// 处理分片数据
for (String data : shardData) {
processSingleData(data);
}
}

/**
* 计算分片数据
* @param dataList 原始数据列表
* @param shardIndex 分片序号
* @param shardTotal 分片总数
* @return 分片数据
*/
private List<String> calculateShardData(List<String> dataList, int shardIndex, int shardTotal) {
List<String> shardData = new ArrayList<>();

for (int i = 0; i < dataList.size(); i++) {
// 使用取模运算分配数据到分片
if (i % shardTotal == shardIndex) {
shardData.add(dataList.get(i));
}
}

return shardData;
}

/**
* 处理单条数据
* @param data 数据
*/
private void processSingleData(String data) {
try {
// 模拟数据处理
Thread.sleep(100);
log.debug("处理数据: {}", data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("数据处理被中断", e);
}
}

/**
* 生成测试数据
* @return 数据列表
*/
private List<String> generateDataList() {
List<String> dataList = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
dataList.add("data_" + i);
}
return dataList;
}
}

3.2 数据库分片任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/**
* 数据库分片任务
*/
@Component
public class DatabaseShardingJob {

private static final Logger log = LoggerFactory.getLogger(DatabaseShardingJob.class);

@Autowired
private UserService userService;

/**
* 用户数据分片处理任务
*/
@XxlJob("userDataShardingJob")
public void userDataShardingJob() {
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

log.info("开始执行用户数据分片任务: shardIndex={}, shardTotal={}", shardIndex, shardTotal);

try {
// 获取分片数据
List<User> shardUsers = getUserDataByShard(shardIndex, shardTotal);

log.info("分片{}获取到用户数据: {}条", shardIndex, shardUsers.size());

// 处理分片数据
processUserData(shardUsers, shardIndex);

log.info("用户数据分片任务执行完成: shardIndex={}", shardIndex);

} catch (Exception e) {
log.error("用户数据分片任务执行失败: shardIndex={}", shardIndex, e);
XxlJobHelper.handleFail("用户数据分片任务执行失败: " + e.getMessage());
}
}

/**
* 根据分片获取用户数据
* @param shardIndex 分片序号
* @param shardTotal 分片总数
* @return 用户数据列表
*/
private List<User> getUserDataByShard(int shardIndex, int shardTotal) {
// 使用分片参数查询数据库
return userService.getUsersByShard(shardIndex, shardTotal);
}

/**
* 处理用户数据
* @param users 用户列表
* @param shardIndex 分片序号
*/
private void processUserData(List<User> users, int shardIndex) {
for (User user : users) {
try {
// 处理用户数据
processUser(user);

// 更新处理状态
userService.updateUserProcessStatus(user.getId(), "PROCESSED");

} catch (Exception e) {
log.error("处理用户数据失败: userId={}, shardIndex={}", user.getId(), shardIndex, e);

// 更新失败状态
userService.updateUserProcessStatus(user.getId(), "FAILED");
}
}
}

/**
* 处理单个用户
* @param user 用户
*/
private void processUser(User user) {
// 模拟用户数据处理
try {
Thread.sleep(50);
log.debug("处理用户: userId={}, username={}", user.getId(), user.getUsername());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("用户处理被中断", e);
}
}
}

/**
* 用户服务
*/
@Service
public class UserService {

@Autowired
private UserMapper userMapper;

/**
* 根据分片获取用户数据
* @param shardIndex 分片序号
* @param shardTotal 分片总数
* @return 用户列表
*/
public List<User> getUsersByShard(int shardIndex, int shardTotal) {
return userMapper.selectUsersByShard(shardIndex, shardTotal);
}

/**
* 更新用户处理状态
* @param userId 用户ID
* @param status 状态
*/
public void updateUserProcessStatus(Long userId, String status) {
userMapper.updateProcessStatus(userId, status);
}
}

/**
* 用户实体
*/
@Data
public class User {
private Long id;
private String username;
private String email;
private String status;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}

4. 高级分片策略

4.1 动态分片任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
/**
* 动态分片任务
*/
@Component
public class DynamicShardingJob {

private static final Logger log = LoggerFactory.getLogger(DynamicShardingJob.class);

@Autowired
private TaskDataService taskDataService;

/**
* 动态分片任务
*/
@XxlJob("dynamicShardingJob")
public void dynamicShardingJob() {
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

log.info("开始执行动态分片任务: shardIndex={}, shardTotal={}", shardIndex, shardTotal);

try {
// 获取任务参数
String jobParam = XxlJobHelper.getJobParam();
TaskConfig taskConfig = parseTaskConfig(jobParam);

// 动态计算分片数据
List<TaskData> shardData = getDynamicShardData(taskConfig, shardIndex, shardTotal);

log.info("分片{}获取到动态数据: {}条", shardIndex, shardData.size());

// 处理分片数据
processDynamicData(shardData, shardIndex, taskConfig);

log.info("动态分片任务执行完成: shardIndex={}", shardIndex);

} catch (Exception e) {
log.error("动态分片任务执行失败: shardIndex={}", shardIndex, e);
XxlJobHelper.handleFail("动态分片任务执行失败: " + e.getMessage());
}
}

/**
* 解析任务配置
* @param jobParam 任务参数
* @return 任务配置
*/
private TaskConfig parseTaskConfig(String jobParam) {
try {
if (StringUtils.hasText(jobParam)) {
return JSON.parseObject(jobParam, TaskConfig.class);
}
} catch (Exception e) {
log.warn("解析任务参数失败,使用默认配置: {}", e.getMessage());
}

return new TaskConfig(); // 使用默认配置
}

/**
* 获取动态分片数据
* @param taskConfig 任务配置
* @param shardIndex 分片序号
* @param shardTotal 分片总数
* @return 分片数据
*/
private List<TaskData> getDynamicShardData(TaskConfig taskConfig, int shardIndex, int shardTotal) {
// 根据配置动态获取数据
List<TaskData> allData = taskDataService.getTaskDataByConfig(taskConfig);

// 计算分片数据
return calculateDynamicShardData(allData, shardIndex, shardTotal, taskConfig);
}

/**
* 计算动态分片数据
* @param allData 所有数据
* @param shardIndex 分片序号
* @param shardTotal 分片总数
* @param taskConfig 任务配置
* @return 分片数据
*/
private List<TaskData> calculateDynamicShardData(List<TaskData> allData, int shardIndex,
int shardTotal, TaskConfig taskConfig) {
List<TaskData> shardData = new ArrayList<>();

for (int i = 0; i < allData.size(); i++) {
TaskData data = allData.get(i);

// 根据配置的分片策略计算分片
int targetShard = calculateTargetShard(data, shardTotal, taskConfig);

if (targetShard == shardIndex) {
shardData.add(data);
}
}

return shardData;
}

/**
* 计算目标分片
* @param data 数据
* @param shardTotal 分片总数
* @param taskConfig 任务配置
* @return 目标分片序号
*/
private int calculateTargetShard(TaskData data, int shardTotal, TaskConfig taskConfig) {
switch (taskConfig.getShardingStrategy()) {
case "hash":
// 基于哈希的分片策略
return Math.abs(data.getId().hashCode()) % shardTotal;

case "range":
// 基于范围的分片策略
return (int) (data.getId() % shardTotal);

case "round":
// 轮询分片策略
return (int) (data.getId() % shardTotal);

default:
// 默认使用哈希策略
return Math.abs(data.getId().hashCode()) % shardTotal;
}
}

/**
* 处理动态数据
* @param shardData 分片数据
* @param shardIndex 分片序号
* @param taskConfig 任务配置
*/
private void processDynamicData(List<TaskData> shardData, int shardIndex, TaskConfig taskConfig) {
for (TaskData data : shardData) {
try {
// 根据配置处理数据
processTaskData(data, taskConfig);

// 更新处理状态
taskDataService.updateProcessStatus(data.getId(), "PROCESSED");

} catch (Exception e) {
log.error("处理任务数据失败: dataId={}, shardIndex={}", data.getId(), shardIndex, e);

// 更新失败状态
taskDataService.updateProcessStatus(data.getId(), "FAILED");
}
}
}

/**
* 处理任务数据
* @param data 数据
* @param taskConfig 任务配置
*/
private void processTaskData(TaskData data, TaskConfig taskConfig) {
// 根据配置处理数据
try {
Thread.sleep(taskConfig.getProcessDelay());
log.debug("处理任务数据: dataId={}, type={}", data.getId(), data.getType());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("任务处理被中断", e);
}
}
}

/**
* 任务配置
*/
@Data
public class TaskConfig {
private String shardingStrategy = "hash"; // 分片策略
private int processDelay = 100; // 处理延迟
private String dataType = "default"; // 数据类型
private Map<String, Object> extraParams = new HashMap<>(); // 额外参数
}

/**
* 任务数据
*/
@Data
public class TaskData {
private Long id;
private String type;
private String content;
private String status;
private LocalDateTime createTime;
}

4.2 分片任务协调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/**
* 分片任务协调器
*/
@Component
public class ShardingTaskCoordinator {

private static final Logger log = LoggerFactory.getLogger(ShardingTaskCoordinator.class);

@Autowired
private RedisTemplate<String, String> redisTemplate;

private static final String COORDINATION_PREFIX = "sharding:coordination:";

/**
* 开始分片任务协调
* @param taskId 任务ID
* @param shardTotal 分片总数
*/
public void startTaskCoordination(String taskId, int shardTotal) {
String coordinationKey = COORDINATION_PREFIX + taskId;

// 初始化协调信息
TaskCoordinationInfo coordinationInfo = new TaskCoordinationInfo();
coordinationInfo.setTaskId(taskId);
coordinationInfo.setShardTotal(shardTotal);
coordinationInfo.setCompletedShards(0);
coordinationInfo.setStartTime(LocalDateTime.now());
coordinationInfo.setStatus("RUNNING");

// 存储协调信息
redisTemplate.opsForValue().set(coordinationKey,
JSON.toJSONString(coordinationInfo), Duration.ofHours(1));

log.info("开始分片任务协调: taskId={}, shardTotal={}", taskId, shardTotal);
}

/**
* 完成分片任务
* @param taskId 任务ID
* @param shardIndex 分片序号
*/
public void completeShardTask(String taskId, int shardIndex) {
String coordinationKey = COORDINATION_PREFIX + taskId;

try {
// 获取协调信息
String coordinationInfoStr = redisTemplate.opsForValue().get(coordinationKey);
if (coordinationInfoStr == null) {
log.warn("协调信息不存在: taskId={}", taskId);
return;
}

TaskCoordinationInfo coordinationInfo = JSON.parseObject(coordinationInfoStr, TaskCoordinationInfo.class);

// 更新完成分片数
coordinationInfo.setCompletedShards(coordinationInfo.getCompletedShards() + 1);

// 检查是否所有分片都完成
if (coordinationInfo.getCompletedShards() >= coordinationInfo.getShardTotal()) {
coordinationInfo.setStatus("COMPLETED");
coordinationInfo.setEndTime(LocalDateTime.now());

log.info("分片任务全部完成: taskId={}, totalShards={}",
taskId, coordinationInfo.getShardTotal());

// 执行后处理
executePostProcessing(taskId, coordinationInfo);
}

// 更新协调信息
redisTemplate.opsForValue().set(coordinationKey,
JSON.toJSONString(coordinationInfo), Duration.ofHours(1));

log.info("分片任务完成: taskId={}, shardIndex={}, completedShards={}/{}",
taskId, shardIndex, coordinationInfo.getCompletedShards(), coordinationInfo.getShardTotal());

} catch (Exception e) {
log.error("完成分片任务异常: taskId={}, shardIndex={}", taskId, shardIndex, e);
}
}

/**
* 执行后处理
* @param taskId 任务ID
* @param coordinationInfo 协调信息
*/
private void executePostProcessing(String taskId, TaskCoordinationInfo coordinationInfo) {
try {
log.info("开始执行分片任务后处理: taskId={}", taskId);

// 这里可以实现后处理逻辑
// 例如:数据汇总、结果合并、通知等

Thread.sleep(1000); // 模拟后处理

log.info("分片任务后处理完成: taskId={}", taskId);

} catch (Exception e) {
log.error("分片任务后处理失败: taskId={}", taskId, e);
}
}

/**
* 获取任务协调状态
* @param taskId 任务ID
* @return 协调信息
*/
public TaskCoordinationInfo getTaskCoordinationStatus(String taskId) {
String coordinationKey = COORDINATION_PREFIX + taskId;

try {
String coordinationInfoStr = redisTemplate.opsForValue().get(coordinationKey);
if (coordinationInfoStr != null) {
return JSON.parseObject(coordinationInfoStr, TaskCoordinationInfo.class);
}
} catch (Exception e) {
log.error("获取任务协调状态异常: taskId={}", taskId, e);
}

return null;
}
}

/**
* 任务协调信息
*/
@Data
public class TaskCoordinationInfo {
private String taskId;
private int shardTotal;
private int completedShards;
private String status;
private LocalDateTime startTime;
private LocalDateTime endTime;
}

5. 分片任务监控

5.1 分片任务监控器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* 分片任务监控器
*/
@Component
public class ShardingTaskMonitor {

private static final Logger log = LoggerFactory.getLogger(ShardingTaskMonitor.class);

@Autowired
private ShardingTaskCoordinator shardingTaskCoordinator;

private final MeterRegistry meterRegistry;
private final Counter shardTaskCounter;
private final Timer shardTaskTimer;

public ShardingTaskMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.shardTaskCounter = Counter.builder("sharding.task.total")
.description("分片任务总数")
.register(meterRegistry);
this.shardTaskTimer = Timer.builder("sharding.task.duration")
.description("分片任务执行时间")
.register(meterRegistry);
}

/**
* 监控分片任务执行
* @param taskId 任务ID
* @param shardIndex 分片序号
* @param taskExecution 任务执行逻辑
*/
public void monitorShardTask(String taskId, int shardIndex, Runnable taskExecution) {
Timer.Sample sample = Timer.start(meterRegistry);

try {
log.info("开始监控分片任务: taskId={}, shardIndex={}", taskId, shardIndex);

// 执行任务
taskExecution.run();

// 记录成功
shardTaskCounter.increment();

// 完成分片任务
shardingTaskCoordinator.completeShardTask(taskId, shardIndex);

log.info("分片任务执行成功: taskId={}, shardIndex={}", taskId, shardIndex);

} catch (Exception e) {
log.error("分片任务执行失败: taskId={}, shardIndex={}", taskId, shardIndex, e);
throw e;
} finally {
// 记录执行时间
sample.stop(shardTaskTimer);
}
}

/**
* 获取分片任务统计
* @return 统计信息
*/
public ShardingTaskStatistics getShardingTaskStatistics() {
ShardingTaskStatistics statistics = new ShardingTaskStatistics();

statistics.setTotalTasks(shardTaskCounter.count());
statistics.setAverageExecutionTime(shardTaskTimer.mean(TimeUnit.MILLISECONDS));
statistics.setMaxExecutionTime(shardTaskTimer.max(TimeUnit.MILLISECONDS));
statistics.setMinExecutionTime(shardTaskTimer.min(TimeUnit.MILLISECONDS));

return statistics;
}
}

/**
* 分片任务统计信息
*/
@Data
public class ShardingTaskStatistics {
private double totalTasks;
private double averageExecutionTime;
private double maxExecutionTime;
private double minExecutionTime;
private LocalDateTime timestamp;

public ShardingTaskStatistics() {
this.timestamp = LocalDateTime.now();
}
}

6. 实际应用示例

6.1 数据同步分片任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/**
* 数据同步分片任务
*/
@Component
public class DataSyncShardingJob {

private static final Logger log = LoggerFactory.getLogger(DataSyncShardingJob.class);

@Autowired
private DataSyncService dataSyncService;

@Autowired
private ShardingTaskCoordinator shardingTaskCoordinator;

@Autowired
private ShardingTaskMonitor shardingTaskMonitor;

/**
* 数据同步分片任务
*/
@XxlJob("dataSyncShardingJob")
public void dataSyncShardingJob() {
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

String taskId = "data-sync-" + System.currentTimeMillis();

log.info("开始执行数据同步分片任务: taskId={}, shardIndex={}, shardTotal={}",
taskId, shardIndex, shardTotal);

// 开始任务协调
shardingTaskCoordinator.startTaskCoordination(taskId, shardTotal);

// 监控任务执行
shardingTaskMonitor.monitorShardTask(taskId, shardIndex, () -> {
try {
// 获取分片数据
List<SyncData> shardData = dataSyncService.getSyncDataByShard(shardIndex, shardTotal);

log.info("分片{}获取到同步数据: {}条", shardIndex, shardData.size());

// 同步数据
syncData(shardData, shardIndex);

} catch (Exception e) {
log.error("数据同步分片任务执行失败: taskId={}, shardIndex={}", taskId, shardIndex, e);
XxlJobHelper.handleFail("数据同步分片任务执行失败: " + e.getMessage());
}
});
}

/**
* 同步数据
* @param syncDataList 同步数据列表
* @param shardIndex 分片序号
*/
private void syncData(List<SyncData> syncDataList, int shardIndex) {
for (SyncData syncData : syncDataList) {
try {
// 同步单条数据
dataSyncService.syncSingleData(syncData);

log.debug("同步数据成功: dataId={}, shardIndex={}", syncData.getId(), shardIndex);

} catch (Exception e) {
log.error("同步数据失败: dataId={}, shardIndex={}", syncData.getId(), shardIndex, e);

// 记录失败数据
dataSyncService.recordFailedSync(syncData.getId(), e.getMessage());
}
}
}
}

/**
* 数据同步服务
*/
@Service
public class DataSyncService {

@Autowired
private SyncDataMapper syncDataMapper;

/**
* 根据分片获取同步数据
* @param shardIndex 分片序号
* @param shardTotal 分片总数
* @return 同步数据列表
*/
public List<SyncData> getSyncDataByShard(int shardIndex, int shardTotal) {
return syncDataMapper.selectSyncDataByShard(shardIndex, shardTotal);
}

/**
* 同步单条数据
* @param syncData 同步数据
*/
public void syncSingleData(SyncData syncData) {
// 模拟数据同步
try {
Thread.sleep(100);
log.debug("同步数据: dataId={}", syncData.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("数据同步被中断", e);
}
}

/**
* 记录失败同步
* @param dataId 数据ID
* @param errorMessage 错误信息
*/
public void recordFailedSync(Long dataId, String errorMessage) {
syncDataMapper.updateSyncStatus(dataId, "FAILED", errorMessage);
}
}

/**
* 同步数据实体
*/
@Data
public class SyncData {
private Long id;
private String sourceTable;
private String targetTable;
private String data;
private String status;
private LocalDateTime createTime;
}

7. 总结

通过XXL-JOB的分片机制,我们成功构建了一个高效的多实例并行处理系统。关键特性包括:

7.1 核心优势

  1. 并行处理: 多实例同时处理不同分片,提升处理效率
  2. 负载均衡: 自动分配任务到不同执行器实例
  3. 故障隔离: 单个分片失败不影响其他分片
  4. 动态扩展: 支持动态调整分片数量
  5. 监控告警: 完善的任务监控和状态跟踪

7.2 最佳实践

  1. 分片策略: 根据数据特点选择合适的分片策略
  2. 任务协调: 使用协调器管理分片任务状态
  3. 异常处理: 完善的异常处理和重试机制
  4. 监控告警: 实时监控任务执行状态
  5. 资源管理: 合理控制分片数量和资源使用

这套分片任务方案不仅能够显著提升任务处理效率,还为大规模数据处理提供了可靠的解决方案。