1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
|
@Component public class DynamicShardingJob { private static final Logger log = LoggerFactory.getLogger(DynamicShardingJob.class); @Autowired private TaskDataService taskDataService;
@XxlJob("dynamicShardingJob") public void dynamicShardingJob() { int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); log.info("开始执行动态分片任务: shardIndex={}, shardTotal={}", shardIndex, shardTotal); try { String jobParam = XxlJobHelper.getJobParam(); TaskConfig taskConfig = parseTaskConfig(jobParam); List<TaskData> shardData = getDynamicShardData(taskConfig, shardIndex, shardTotal); log.info("分片{}获取到动态数据: {}条", shardIndex, shardData.size()); processDynamicData(shardData, shardIndex, taskConfig); log.info("动态分片任务执行完成: shardIndex={}", shardIndex); } catch (Exception e) { log.error("动态分片任务执行失败: shardIndex={}", shardIndex, e); XxlJobHelper.handleFail("动态分片任务执行失败: " + e.getMessage()); } }
private TaskConfig parseTaskConfig(String jobParam) { try { if (StringUtils.hasText(jobParam)) { return JSON.parseObject(jobParam, TaskConfig.class); } } catch (Exception e) { log.warn("解析任务参数失败,使用默认配置: {}", e.getMessage()); } return new TaskConfig(); }
private List<TaskData> getDynamicShardData(TaskConfig taskConfig, int shardIndex, int shardTotal) { List<TaskData> allData = taskDataService.getTaskDataByConfig(taskConfig); return calculateDynamicShardData(allData, shardIndex, shardTotal, taskConfig); }
private List<TaskData> calculateDynamicShardData(List<TaskData> allData, int shardIndex, int shardTotal, TaskConfig taskConfig) { List<TaskData> shardData = new ArrayList<>(); for (int i = 0; i < allData.size(); i++) { TaskData data = allData.get(i); int targetShard = calculateTargetShard(data, shardTotal, taskConfig); if (targetShard == shardIndex) { shardData.add(data); } } return shardData; }
private int calculateTargetShard(TaskData data, int shardTotal, TaskConfig taskConfig) { switch (taskConfig.getShardingStrategy()) { case "hash": return Math.abs(data.getId().hashCode()) % shardTotal; case "range": return (int) (data.getId() % shardTotal); case "round": return (int) (data.getId() % shardTotal); default: return Math.abs(data.getId().hashCode()) % shardTotal; } }
private void processDynamicData(List<TaskData> shardData, int shardIndex, TaskConfig taskConfig) { for (TaskData data : shardData) { try { processTaskData(data, taskConfig); taskDataService.updateProcessStatus(data.getId(), "PROCESSED"); } catch (Exception e) { log.error("处理任务数据失败: dataId={}, shardIndex={}", data.getId(), shardIndex, e); taskDataService.updateProcessStatus(data.getId(), "FAILED"); } } }
private void processTaskData(TaskData data, TaskConfig taskConfig) { try { Thread.sleep(taskConfig.getProcessDelay()); log.debug("处理任务数据: dataId={}, type={}", data.getId(), data.getType()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("任务处理被中断", e); } } }
@Data public class TaskConfig { private String shardingStrategy = "hash"; private int processDelay = 100; private String dataType = "default"; private Map<String, Object> extraParams = new HashMap<>(); }
@Data public class TaskData { private Long id; private String type; private String content; private String status; private LocalDateTime createTime; }
|