1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
@Service public class HorizontalTableService {
@Autowired private TableRouter tableRouter;
@Autowired private DatabaseConnectionManager connectionManager;
private final int TABLE_COUNT = 8;
public Object executeByShardKey(String shardKey, String sql, Object... params) { try { int shardIndex = calculateShardIndex(shardKey);
String tableName = getTableName(shardIndex);
String shardedSQL = replaceTableName(sql, tableName);
DataSource dataSource = connectionManager.getDataSource(0);
return executeSQL(dataSource, shardedSQL, params);
} catch (Exception e) { log.error("水平分表执行失败", e); throw new DatabaseException("水平分表执行失败", e); } }
private int calculateShardIndex(String shardKey) { return Math.abs(shardKey.hashCode()) % TABLE_COUNT; }
private String getTableName(int shardIndex) { return String.format("user_%d", shardIndex); }
private String replaceTableName(String sql, String tableName) { return sql.replaceAll("user", tableName); }
public List<Map<String, Object>> crossTableQuery(String sql, Object... params) { List<Map<String, Object>> allResults = new ArrayList<>();
IntStream.range(0, TABLE_COUNT).parallel().forEach(shardIndex -> { try { String tableName = getTableName(shardIndex); String shardedSQL = replaceTableName(sql, tableName);
DataSource dataSource = connectionManager.getDataSource(0); List<Map<String, Object>> results = executeQuery(dataSource, shardedSQL, params);
synchronized (allResults) { allResults.addAll(results); }
} catch (Exception e) { log.error("分表{}查询失败", shardIndex, e); } });
return allResults; }
private List<Map<String, Object>> executeQuery(DataSource dataSource, String sql, Object... params) { try (Connection connection = dataSource.getConnection(); PreparedStatement statement = connection.prepareStatement(sql)) {
for (int i = 0; i < params.length; i++) { statement.setObject(i + 1, params[i]); }
ResultSet resultSet = statement.executeQuery(); return convertResultSetToList(resultSet);
} catch (SQLException e) { log.error("查询执行失败", e); throw new DatabaseException("查询执行失败", e); } }
private Object executeSQL(DataSource dataSource, String sql, Object... params) { try (Connection connection = dataSource.getConnection(); PreparedStatement statement = connection.prepareStatement(sql)) {
for (int i = 0; i < params.length; i++) { statement.setObject(i + 1, params[i]); }
if (sql.trim().toUpperCase().startsWith("SELECT")) { ResultSet resultSet = statement.executeQuery(); return convertResultSetToList(resultSet); } else { return statement.executeUpdate(); }
} catch (SQLException e) { log.error("SQL执行失败", e); throw new DatabaseException("SQL执行失败", e); } }
private List<Map<String, Object>> convertResultSetToList(ResultSet resultSet) throws SQLException { List<Map<String, Object>> results = new ArrayList<>(); ResultSetMetaData metaData = resultSet.getMetaData(); int columnCount = metaData.getColumnCount();
while (resultSet.next()) { Map<String, Object> row = new HashMap<>(); for (int i = 1; i <= columnCount; i++) { String columnName = metaData.getColumnName(i); Object value = resultSet.getObject(i); row.put(columnName, value); } results.add(row); }
return results; } }
|