
|
@Component public class DynamicShardingJob { private static final Logger log = LoggerFactory.getLogger(DynamicShardingJob.class); @Autowired private TaskDataService taskDataService;
@XxlJob("dynamicShardingJob") public void dynamicShardingJob() { int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); log.info("开始执行动态分片任务: shardIndex={}, shardTotal={}", shardIndex, shardTotal); try { String jobParam = XxlJobHelper.getJobParam(); TaskConfig taskConfig = parseTaskConfig(jobParam); List<TaskData> shardData = getDynamicShardData(taskConfig, shardIndex, shardTotal); log.info("分片{}获取到动态数据: {}条", shardIndex, shardData.size()); processDynamicData(shardData, shardIndex, taskConfig); log.info("动态分片任务执行完成: shardIndex={}", shardIndex); } catch (Exception e) { log.error("动态分片任务执行失败: shardIndex={}", shardIndex, e); XxlJobHelper.handleFail("动态分片任务执行失败: " + e.getMessage()); } }
private TaskConfig parseTaskConfig(String jobParam) { try { if (StringUtils.hasText(jobParam)) { return JSON.parseObject(jobParam, TaskConfig.class); } } catch (Exception e) { log.warn("解析任务参数失败,使用默认配置: {}", e.getMessage()); } return new TaskConfig(); }
private List<TaskData> getDynamicShardData(TaskConfig taskConfig, int shardIndex, int shardTotal) { List<TaskData> allData = taskDataService.getTaskDataByConfig(taskConfig); return calculateDynamicShardData(allData, shardIndex, shardTotal, taskConfig); }
private List<TaskData> calculateDynamicShardData(List<TaskData> allData, int shardIndex, int shardTotal, TaskConfig taskConfig) { List<TaskData> shardData = new ArrayList<>(); for (int i = 0; i < allData.size(); i++) { TaskData data = allData.get(i); int targetShard = calculateTargetShard(data, shardTotal, taskConfig); if (targetShard == shardIndex) { shardData.add(data); } } return shardData; }
private int calculateTargetShard(TaskData data, int shardTotal, TaskConfig taskConfig) { switch (taskConfig.getShardingStrategy()) { case "hash": return Math.abs(data.getId().hashCode()) % shardTotal; case "range": return (int) (data.getId() % shardTotal); case "round": return (int) (data.getId() % shardTotal); default: return Math.abs(data.getId().hashCode()) % shardTotal; } }
private void processDynamicData(List<TaskData> shardData, int shardIndex, TaskConfig taskConfig) { for (TaskData data : shardData) { try { processTaskData(data, taskConfig); taskDataService.updateProcessStatus(data.getId(), "PROCESSED"); } catch (Exception e) { log.error("处理任务数据失败: dataId={}, shardIndex={}", data.getId(), shardIndex, e); taskDataService.updateProcessStatus(data.getId(), "FAILED"); } } }
private void processTaskData(TaskData data, TaskConfig taskConfig) { try { Thread.sleep(taskConfig.getProcessDelay()); log.debug("处理任务数据: dataId={}, type={}", data.getId(), data.getType()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("任务处理被中断", e); } } }
@Data public class TaskConfig { private String shardingStrategy = "hash"; private int processDelay = 100; private String dataType = "default"; private Map<String, Object> extraParams = new HashMap<>(); }
@Data public class TaskData { private Long id; private String type; private String content; private String status; private LocalDateTime createTime; }
|