1. HBase分布式处理概述

HBase是一个分布式、可扩展的大数据存储系统,基于Hadoop和ZooKeeper构建,具有高可用性、强一致性和水平扩展能力。本文将详细介绍HBase分布式数据库的配置、数据存储、查询优化、集群管理和性能调优的完整解决方案。

1.1 核心功能

  1. 数据存储: 海量数据的分布式存储
  2. 数据查询: 高效的数据查询和检索
  3. 数据管理: 表结构管理和数据操作
  4. 集群管理: HBase集群部署和管理
  5. 性能优化: 查询性能优化和调优

1.2 技术架构

1
2
3
4
5
客户端 → HBase Master → Region Server → HDFS
↓ ↓ ↓ ↓
数据操作 → 元数据管理 → 数据存储 → 数据持久化
↓ ↓ ↓ ↓
负载均衡 → 故障恢复 → 数据分片 → 数据备份

2. HBase配置

2.1 HBase配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/**
* HBase配置类
*/
@Configuration
public class HBaseConfig {

@Value("${hbase.zookeeper.quorum}")
private String zookeeperQuorum;

@Value("${hbase.zookeeper.property.clientPort}")
private String zookeeperPort;

@Value("${hbase.master}")
private String hbaseMaster;

@Value("${hbase.rootdir}")
private String hbaseRootDir;

/**
* HBase配置属性
*/
@Bean
public HBaseProperties hBaseProperties() {
return HBaseProperties.builder()
.zookeeperQuorum(zookeeperQuorum)
.zookeeperPort(zookeeperPort)
.hbaseMaster(hbaseMaster)
.hbaseRootDir(hbaseRootDir)
.build();
}

/**
* HBase配置
*/
@Bean
public Configuration hbaseConfiguration() {
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", zookeeperQuorum);
config.set("hbase.zookeeper.property.clientPort", zookeeperPort);
config.set("hbase.master", hbaseMaster);
config.set("hbase.rootdir", hbaseRootDir);

// 连接池配置
config.set("hbase.client.retries.number", "3");
config.set("hbase.client.pause", "1000");
config.set("hbase.client.operation.timeout", "30000");
config.set("hbase.client.scanner.timeout.period", "60000");

// 批量操作配置
config.set("hbase.client.write.buffer", "2097152");
config.set("hbase.client.max.total.tasks", "100");
config.set("hbase.client.max.perserver.tasks", "5");

return config;
}

/**
* HBase连接
*/
@Bean
public Connection hbaseConnection() throws IOException {
return ConnectionFactory.createConnection(hbaseConfiguration());
}

/**
* HBase管理接口
*/
@Bean
public Admin hbaseAdmin() throws IOException {
return hbaseConnection().getAdmin();
}
}

/**
* HBase配置属性
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class HBaseProperties {
private String zookeeperQuorum;
private String zookeeperPort;
private String hbaseMaster;
private String hbaseRootDir;

// 连接配置
private int retries = 3;
private int pause = 1000;
private int operationTimeout = 30000;
private int scannerTimeout = 60000;

// 批量操作配置
private int writeBuffer = 2097152;
private int maxTotalTasks = 100;
private int maxPerServerTasks = 5;
}

2.2 应用配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# application.yml
hbase:
zookeeper:
quorum: localhost
property:
clientPort: 2181
master: localhost:16000
rootdir: hdfs://localhost:9000/hbase

# HBase表配置
hbase:
table:
user-info:
families: ["info", "profile", "settings"]
max-versions: 3
ttl: 86400
order-data:
families: ["order", "payment", "shipping"]
max-versions: 1
ttl: 2592000

3. HBase数据模型

3.1 HBase数据模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/**
* HBase数据模型
*/
public class HBaseDataModel {

/**
* 用户信息表
*/
public static class UserInfoTable {
public static final String TABLE_NAME = "user_info";
public static final String INFO_FAMILY = "info";
public static final String PROFILE_FAMILY = "profile";
public static final String SETTINGS_FAMILY = "settings";

// 列限定符
public static final String USER_ID_QUALIFIER = "user_id";
public static final String USERNAME_QUALIFIER = "username";
public static final String EMAIL_QUALIFIER = "email";
public static final String PHONE_QUALIFIER = "phone";
public static final String AVATAR_QUALIFIER = "avatar";
public static final String GENDER_QUALIFIER = "gender";
public static final String BIRTHDAY_QUALIFIER = "birthday";
public static final String LOCATION_QUALIFIER = "location";
public static final String LANGUAGE_QUALIFIER = "language";
public static final String TIMEZONE_QUALIFIER = "timezone";
public static final String NOTIFICATION_QUALIFIER = "notification";
}

/**
* 订单数据表
*/
public static class OrderDataTable {
public static final String TABLE_NAME = "order_data";
public static final String ORDER_FAMILY = "order";
public static final String PAYMENT_FAMILY = "payment";
public static final String SHIPPING_FAMILY = "shipping";

// 列限定符
public static final String ORDER_ID_QUALIFIER = "order_id";
public static final String USER_ID_QUALIFIER = "user_id";
public static final String PRODUCT_ID_QUALIFIER = "product_id";
public static final String QUANTITY_QUALIFIER = "quantity";
public static final String PRICE_QUALIFIER = "price";
public static final String STATUS_QUALIFIER = "status";
public static final String CREATE_TIME_QUALIFIER = "create_time";
public static final String PAYMENT_METHOD_QUALIFIER = "payment_method";
public static final String PAYMENT_STATUS_QUALIFIER = "payment_status";
public static final String SHIPPING_ADDRESS_QUALIFIER = "shipping_address";
public static final String SHIPPING_STATUS_QUALIFIER = "shipping_status";
}

/**
* 日志数据表
*/
public static class LogDataTable {
public static final String TABLE_NAME = "log_data";
public static final String LOG_FAMILY = "log";
public static final String META_FAMILY = "meta";

// 列限定符
public static final String LOG_LEVEL_QUALIFIER = "level";
public static final String LOG_MESSAGE_QUALIFIER = "message";
public static final String LOG_SOURCE_QUALIFIER = "source";
public static final String LOG_TIMESTAMP_QUALIFIER = "timestamp";
public static final String USER_ID_QUALIFIER = "user_id";
public static final String SESSION_ID_QUALIFIER = "session_id";
public static final String IP_ADDRESS_QUALIFIER = "ip_address";
public static final String USER_AGENT_QUALIFIER = "user_agent";
}
}

/**
* HBase行键生成器
*/
@Component
public class HBaseRowKeyGenerator {

/**
* 生成用户信息行键
* @param userId 用户ID
* @return 行键
*/
public String generateUserInfoRowKey(Long userId) {
// 使用用户ID作为行键,确保唯一性
return String.format("user_%010d", userId);
}

/**
* 生成订单数据行键
* @param orderId 订单ID
* @param createTime 创建时间
* @return 行键
*/
public String generateOrderDataRowKey(String orderId, LocalDateTime createTime) {
// 使用时间戳前缀 + 订单ID,确保时间顺序
long timestamp = createTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
return String.format("%013d_%s", Long.MAX_VALUE - timestamp, orderId);
}

/**
* 生成日志数据行键
* @param timestamp 时间戳
* @param userId 用户ID
* @param logId 日志ID
* @return 行键
*/
public String generateLogDataRowKey(long timestamp, Long userId, String logId) {
// 使用时间戳前缀 + 用户ID + 日志ID,确保时间顺序和唯一性
return String.format("%013d_%010d_%s", Long.MAX_VALUE - timestamp, userId, logId);
}

/**
* 生成反向时间戳
* @param timestamp 时间戳
* @return 反向时间戳
*/
private long reverseTimestamp(long timestamp) {
return Long.MAX_VALUE - timestamp;
}
}

4. HBase数据操作服务

4.1 HBase数据操作服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
/**
* HBase数据操作服务
*/
@Service
public class HBaseDataService {

private final Connection connection;
private final Admin admin;
private final HBaseRowKeyGenerator rowKeyGenerator;

public HBaseDataService(Connection connection, Admin admin, HBaseRowKeyGenerator rowKeyGenerator) {
this.connection = connection;
this.admin = admin;
this.rowKeyGenerator = rowKeyGenerator;
}

/**
* 创建表
* @param tableName 表名
* @param columnFamilies 列族列表
* @return 创建结果
*/
public boolean createTable(String tableName, List<String> columnFamilies) {
try {
// 1. 检查表是否存在
if (admin.tableExists(TableName.valueOf(tableName))) {
log.warn("表已存在: {}", tableName);
return false;
}

// 2. 创建表描述符
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));

// 3. 添加列族
for (String columnFamily : columnFamilies) {
ColumnFamilyDescriptorBuilder familyBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
familyBuilder.setMaxVersions(3);
familyBuilder.setTimeToLive(86400); // 1天
familyBuilder.setCompressionType(Compression.Algorithm.SNAPPY);
familyBuilder.setBloomFilterType(BloomType.ROW);

tableBuilder.setColumnFamily(familyBuilder.build());
}

// 4. 创建表
admin.createTable(tableBuilder.build());

log.info("创建表成功: {}", tableName);
return true;

} catch (Exception e) {
log.error("创建表失败: {}", tableName, e);
return false;
}
}

/**
* 删除表
* @param tableName 表名
* @return 删除结果
*/
public boolean deleteTable(String tableName) {
try {
// 1. 检查表是否存在
if (!admin.tableExists(TableName.valueOf(tableName))) {
log.warn("表不存在: {}", tableName);
return false;
}

// 2. 禁用表
admin.disableTable(TableName.valueOf(tableName));

// 3. 删除表
admin.deleteTable(TableName.valueOf(tableName));

log.info("删除表成功: {}", tableName);
return true;

} catch (Exception e) {
log.error("删除表失败: {}", tableName, e);
return false;
}
}

/**
* 插入数据
* @param tableName 表名
* @param rowKey 行键
* @param columnFamily 列族
* @param columnQualifier 列限定符
* @param value 值
* @return 插入结果
*/
public boolean putData(String tableName, String rowKey, String columnFamily,
String columnQualifier, String value) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));

Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier),
System.currentTimeMillis(), Bytes.toBytes(value));

table.put(put);
table.close();

log.debug("插入数据成功: table={}, rowKey={}, column={}:{}",
tableName, rowKey, columnFamily, columnQualifier);

return true;

} catch (Exception e) {
log.error("插入数据失败: table={}, rowKey={}", tableName, rowKey, e);
return false;
}
}

/**
* 批量插入数据
* @param tableName 表名
* @param puts 插入操作列表
* @return 插入结果
*/
public boolean batchPutData(String tableName, List<Put> puts) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));

// 批量插入
Object[] results = new Object[puts.size()];
table.batch(puts, results);

table.close();

log.info("批量插入数据成功: table={}, count={}", tableName, puts.size());

return true;

} catch (Exception e) {
log.error("批量插入数据失败: table={}", tableName, e);
return false;
}
}

/**
* 查询数据
* @param tableName 表名
* @param rowKey 行键
* @return 查询结果
*/
public Result getData(String tableName, String rowKey) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));

Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);

table.close();

log.debug("查询数据成功: table={}, rowKey={}", tableName, rowKey);

return result;

} catch (Exception e) {
log.error("查询数据失败: table={}, rowKey={}", tableName, rowKey, e);
return null;
}
}

/**
* 批量查询数据
* @param tableName 表名
* @param rowKeys 行键列表
* @return 查询结果列表
*/
public Result[] batchGetData(String tableName, List<String> rowKeys) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));

List<Get> gets = rowKeys.stream()
.map(rowKey -> new Get(Bytes.toBytes(rowKey)))
.collect(Collectors.toList());

Result[] results = table.get(gets);

table.close();

log.debug("批量查询数据成功: table={}, count={}", tableName, rowKeys.size());

return results;

} catch (Exception e) {
log.error("批量查询数据失败: table={}", tableName, e);
return new Result[0];
}
}

/**
* 扫描数据
* @param tableName 表名
* @param startRow 起始行
* @param stopRow 结束行
* @param maxRows 最大行数
* @return 扫描结果
*/
public List<Result> scanData(String tableName, String startRow, String stopRow, int maxRows) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));

Scan scan = new Scan();
if (startRow != null) {
scan.withStartRow(Bytes.toBytes(startRow));
}
if (stopRow != null) {
scan.withStopRow(Bytes.toBytes(stopRow));
}
scan.setMaxResultSize(maxRows);

ResultScanner scanner = table.getScanner(scan);
List<Result> results = new ArrayList<>();

for (Result result : scanner) {
results.add(result);
if (results.size() >= maxRows) {
break;
}
}

scanner.close();
table.close();

log.debug("扫描数据成功: table={}, count={}", tableName, results.size());

return results;

} catch (Exception e) {
log.error("扫描数据失败: table={}", tableName, e);
return new ArrayList<>();
}
}

/**
* 删除数据
* @param tableName 表名
* @param rowKey 行键
* @return 删除结果
*/
public boolean deleteData(String tableName, String rowKey) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));

Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);

table.close();

log.debug("删除数据成功: table={}, rowKey={}", tableName, rowKey);

return true;

} catch (Exception e) {
log.error("删除数据失败: table={}, rowKey={}", tableName, rowKey, e);
return false;
}
}

/**
* 删除列数据
* @param tableName 表名
* @param rowKey 行键
* @param columnFamily 列族
* @param columnQualifier 列限定符
* @return 删除结果
*/
public boolean deleteColumnData(String tableName, String rowKey, String columnFamily, String columnQualifier) {
try {
Table table = connection.getTable(TableName.valueOf(tableName));

Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier));

table.delete(delete);

table.close();

log.debug("删除列数据成功: table={}, rowKey={}, column={}:{}",
tableName, rowKey, columnFamily, columnQualifier);

return true;

} catch (Exception e) {
log.error("删除列数据失败: table={}, rowKey={}", tableName, rowKey, e);
return false;
}
}
}

5. HBase业务服务

5.1 用户信息服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
/**
* 用户信息服务
*/
@Service
public class UserInfoService {

private final HBaseDataService hBaseDataService;
private final HBaseRowKeyGenerator rowKeyGenerator;

public UserInfoService(HBaseDataService hBaseDataService, HBaseRowKeyGenerator rowKeyGenerator) {
this.hBaseDataService = hBaseDataService;
this.rowKeyGenerator = rowKeyGenerator;
}

/**
* 保存用户信息
* @param userInfo 用户信息
* @return 保存结果
*/
public boolean saveUserInfo(UserInfo userInfo) {
try {
String rowKey = rowKeyGenerator.generateUserInfoRowKey(userInfo.getUserId());

List<Put> puts = new ArrayList<>();

// 基本信息
puts.add(createPut(rowKey, HBaseDataModel.UserInfoTable.INFO_FAMILY,
HBaseDataModel.UserInfoTable.USER_ID_QUALIFIER, userInfo.getUserId().toString()));
puts.add(createPut(rowKey, HBaseDataModel.UserInfoTable.INFO_FAMILY,
HBaseDataModel.UserInfoTable.USERNAME_QUALIFIER, userInfo.getUsername()));
puts.add(createPut(rowKey, HBaseDataModel.UserInfoTable.INFO_FAMILY,
HBaseDataModel.UserInfoTable.EMAIL_QUALIFIER, userInfo.getEmail()));
puts.add(createPut(rowKey, HBaseDataModel.UserInfoTable.INFO_FAMILY,
HBaseDataModel.UserInfoTable.PHONE_QUALIFIER, userInfo.getPhone()));

// 个人资料
if (userInfo.getAvatar() != null) {
puts.add(createPut(rowKey, HBaseDataModel.UserInfoTable.PROFILE_FAMILY,
HBaseDataModel.UserInfoTable.AVATAR_QUALIFIER, userInfo.getAvatar()));
}
if (userInfo.getGender() != null) {
puts.add(createPut(rowKey, HBaseDataModel.UserInfoTable.PROFILE_FAMILY,
HBaseDataModel.UserInfoTable.GENDER_QUALIFIER, userInfo.getGender()));
}
if (userInfo.getBirthday() != null) {
puts.add(createPut(rowKey, HBaseDataModel.UserInfoTable.PROFILE_FAMILY,
HBaseDataModel.UserInfoTable.BIRTHDAY_QUALIFIER, userInfo.getBirthday().toString()));
}
if (userInfo.getLocation() != null) {
puts.add(createPut(rowKey, HBaseDataModel.UserInfoTable.PROFILE_FAMILY,
HBaseDataModel.UserInfoTable.LOCATION_QUALIFIER, userInfo.getLocation()));
}

// 设置信息
if (userInfo.getLanguage() != null) {
puts.add(createPut(rowKey, HBaseDataModel.UserInfoTable.SETTINGS_FAMILY,
HBaseDataModel.UserInfoTable.LANGUAGE_QUALIFIER, userInfo.getLanguage()));
}
if (userInfo.getTimezone() != null) {
puts.add(createPut(rowKey, HBaseDataModel.UserInfoTable.SETTINGS_FAMILY,
HBaseDataModel.UserInfoTable.TIMEZONE_QUALIFIER, userInfo.getTimezone()));
}
if (userInfo.getNotification() != null) {
puts.add(createPut(rowKey, HBaseDataModel.UserInfoTable.SETTINGS_FAMILY,
HBaseDataModel.UserInfoTable.NOTIFICATION_QUALIFIER, userInfo.getNotification().toString()));
}

return hBaseDataService.batchPutData(HBaseDataModel.UserInfoTable.TABLE_NAME, puts);

} catch (Exception e) {
log.error("保存用户信息失败: userId={}", userInfo.getUserId(), e);
return false;
}
}

/**
* 获取用户信息
* @param userId 用户ID
* @return 用户信息
*/
public UserInfo getUserInfo(Long userId) {
try {
String rowKey = rowKeyGenerator.generateUserInfoRowKey(userId);
Result result = hBaseDataService.getData(HBaseDataModel.UserInfoTable.TABLE_NAME, rowKey);

if (result.isEmpty()) {
return null;
}

return parseUserInfo(result);

} catch (Exception e) {
log.error("获取用户信息失败: userId={}", userId, e);
return null;
}
}

/**
* 更新用户信息
* @param userId 用户ID
* @param updates 更新字段
* @return 更新结果
*/
public boolean updateUserInfo(Long userId, Map<String, String> updates) {
try {
String rowKey = rowKeyGenerator.generateUserInfoRowKey(userId);

List<Put> puts = new ArrayList<>();
for (Map.Entry<String, String> entry : updates.entrySet()) {
String[] parts = entry.getKey().split(":");
if (parts.length == 2) {
puts.add(createPut(rowKey, parts[0], parts[1], entry.getValue()));
}
}

return hBaseDataService.batchPutData(HBaseDataModel.UserInfoTable.TABLE_NAME, puts);

} catch (Exception e) {
log.error("更新用户信息失败: userId={}", userId, e);
return false;
}
}

/**
* 删除用户信息
* @param userId 用户ID
* @return 删除结果
*/
public boolean deleteUserInfo(Long userId) {
try {
String rowKey = rowKeyGenerator.generateUserInfoRowKey(userId);
return hBaseDataService.deleteData(HBaseDataModel.UserInfoTable.TABLE_NAME, rowKey);

} catch (Exception e) {
log.error("删除用户信息失败: userId={}", userId, e);
return false;
}
}

/**
* 创建Put对象
* @param rowKey 行键
* @param columnFamily 列族
* @param columnQualifier 列限定符
* @param value 值
* @return Put对象
*/
private Put createPut(String rowKey, String columnFamily, String columnQualifier, String value) {
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier),
System.currentTimeMillis(), Bytes.toBytes(value));
return put;
}

/**
* 解析用户信息
* @param result 查询结果
* @return 用户信息
*/
private UserInfo parseUserInfo(Result result) {
try {
UserInfo userInfo = new UserInfo();

// 基本信息
userInfo.setUserId(Long.parseLong(getCellValue(result, HBaseDataModel.UserInfoTable.INFO_FAMILY,
HBaseDataModel.UserInfoTable.USER_ID_QUALIFIER)));
userInfo.setUsername(getCellValue(result, HBaseDataModel.UserInfoTable.INFO_FAMILY,
HBaseDataModel.UserInfoTable.USERNAME_QUALIFIER));
userInfo.setEmail(getCellValue(result, HBaseDataModel.UserInfoTable.INFO_FAMILY,
HBaseDataModel.UserInfoTable.EMAIL_QUALIFIER));
userInfo.setPhone(getCellValue(result, HBaseDataModel.UserInfoTable.INFO_FAMILY,
HBaseDataModel.UserInfoTable.PHONE_QUALIFIER));

// 个人资料
userInfo.setAvatar(getCellValue(result, HBaseDataModel.UserInfoTable.PROFILE_FAMILY,
HBaseDataModel.UserInfoTable.AVATAR_QUALIFIER));
userInfo.setGender(getCellValue(result, HBaseDataModel.UserInfoTable.PROFILE_FAMILY,
HBaseDataModel.UserInfoTable.GENDER_QUALIFIER));

String birthdayStr = getCellValue(result, HBaseDataModel.UserInfoTable.PROFILE_FAMILY,
HBaseDataModel.UserInfoTable.BIRTHDAY_QUALIFIER);
if (birthdayStr != null) {
userInfo.setBirthday(LocalDate.parse(birthdayStr));
}

userInfo.setLocation(getCellValue(result, HBaseDataModel.UserInfoTable.PROFILE_FAMILY,
HBaseDataModel.UserInfoTable.LOCATION_QUALIFIER));

// 设置信息
userInfo.setLanguage(getCellValue(result, HBaseDataModel.UserInfoTable.SETTINGS_FAMILY,
HBaseDataModel.UserInfoTable.LANGUAGE_QUALIFIER));
userInfo.setTimezone(getCellValue(result, HBaseDataModel.UserInfoTable.SETTINGS_FAMILY,
HBaseDataModel.UserInfoTable.TIMEZONE_QUALIFIER));

String notificationStr = getCellValue(result, HBaseDataModel.UserInfoTable.SETTINGS_FAMILY,
HBaseDataModel.UserInfoTable.NOTIFICATION_QUALIFIER);
if (notificationStr != null) {
userInfo.setNotification(Boolean.parseBoolean(notificationStr));
}

return userInfo;

} catch (Exception e) {
log.error("解析用户信息失败", e);
return null;
}
}

/**
* 获取单元格值
* @param result 查询结果
* @param columnFamily 列族
* @param columnQualifier 列限定符
* @return 单元格值
*/
private String getCellValue(Result result, String columnFamily, String columnQualifier) {
Cell cell = result.getColumnLatestCell(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier));
if (cell != null) {
return Bytes.toString(CellUtil.cloneValue(cell));
}
return null;
}
}

/**
* 用户信息实体
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UserInfo {
private Long userId;
private String username;
private String email;
private String phone;
private String avatar;
private String gender;
private LocalDate birthday;
private String location;
private String language;
private String timezone;
private Boolean notification;
}

6. HBase集群管理

6.1 HBase集群管理服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/**
* HBase集群管理服务
*/
@Service
public class HBaseClusterService {

private final Admin admin;
private final Connection connection;

public HBaseClusterService(Admin admin, Connection connection) {
this.admin = admin;
this.connection = connection;
}

/**
* 获取集群状态
* @return 集群状态
*/
public ClusterStatus getClusterStatus() {
try {
ClusterMetrics metrics = admin.getClusterMetrics();

ClusterStatus status = ClusterStatus.builder()
.clusterId(metrics.getClusterId())
.masterInfo(metrics.getMasterInfo())
.regionServers(metrics.getRegionServers())
.deadRegionServers(metrics.getDeadRegionServers())
.totalRegions(metrics.getTotalRegions())
.totalRequests(metrics.getTotalRequests())
.build();

log.debug("获取集群状态成功: regionServers={}, totalRegions={}",
status.getRegionServers(), status.getTotalRegions());

return status;

} catch (Exception e) {
log.error("获取集群状态失败", e);
throw new BusinessException("获取集群状态失败", e);
}
}

/**
* 获取表列表
* @return 表列表
*/
public List<TableInfo> getTables() {
try {
TableName[] tableNames = admin.listTableNames();

List<TableInfo> tables = new ArrayList<>();
for (TableName tableName : tableNames) {
TableDescriptor descriptor = admin.getDescriptor(tableName);

TableInfo tableInfo = TableInfo.builder()
.name(tableName.getNameAsString())
.families(descriptor.getColumnFamilyNames().stream()
.map(Bytes::toString)
.collect(Collectors.toList()))
.build();

tables.add(tableInfo);
}

log.debug("获取表列表成功: count={}", tables.size());

return tables;

} catch (Exception e) {
log.error("获取表列表失败", e);
throw new BusinessException("获取表列表失败", e);
}
}

/**
* 获取表详细信息
* @param tableName 表名
* @return 表详细信息
*/
public TableDetailInfo getTableDetail(String tableName) {
try {
TableDescriptor descriptor = admin.getDescriptor(TableName.valueOf(tableName));

TableDetailInfo detailInfo = TableDetailInfo.builder()
.name(tableName)
.families(descriptor.getColumnFamilyNames().stream()
.map(Bytes::toString)
.collect(Collectors.toList()))
.build();

log.debug("获取表详细信息成功: table={}", tableName);

return detailInfo;

} catch (Exception e) {
log.error("获取表详细信息失败: table={}", tableName, e);
throw new BusinessException("获取表详细信息失败", e);
}
}

/**
* 获取Region信息
* @param tableName 表名
* @return Region信息列表
*/
public List<RegionInfo> getRegions(String tableName) {
try {
List<RegionInfo> regions = admin.getRegions(TableName.valueOf(tableName));

log.debug("获取Region信息成功: table={}, count={}", tableName, regions.size());

return regions;

} catch (Exception e) {
log.error("获取Region信息失败: table={}", tableName, e);
throw new BusinessException("获取Region信息失败", e);
}
}

/**
* 压缩表
* @param tableName 表名
* @return 压缩结果
*/
public boolean compactTable(String tableName) {
try {
admin.majorCompact(TableName.valueOf(tableName));

log.info("压缩表成功: table={}", tableName);

return true;

} catch (Exception e) {
log.error("压缩表失败: table={}", tableName, e);
return false;
}
}

/**
* 刷新表
* @param tableName 表名
* @return 刷新结果
*/
public boolean flushTable(String tableName) {
try {
admin.flush(TableName.valueOf(tableName));

log.info("刷新表成功: table={}", tableName);

return true;

} catch (Exception e) {
log.error("刷新表失败: table={}", tableName, e);
return false;
}
}
}

/**
* 集群状态
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ClusterStatus {
private String clusterId;
private String masterInfo;
private int regionServers;
private int deadRegionServers;
private int totalRegions;
private long totalRequests;
}

/**
* 表信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TableInfo {
private String name;
private List<String> families;
}

/**
* 表详细信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TableDetailInfo {
private String name;
private List<String> families;
}

7. HBase控制器

7.1 HBase控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/**
* HBase控制器
*/
@RestController
@RequestMapping("/hbase")
public class HBaseController {

@Autowired
private HBaseDataService hBaseDataService;

@Autowired
private UserInfoService userInfoService;

@Autowired
private HBaseClusterService clusterService;

/**
* 创建表
*/
@PostMapping("/table/create")
public ResponseEntity<Map<String, Object>> createTable(
@RequestParam String tableName,
@RequestBody List<String> columnFamilies) {
try {
boolean success = hBaseDataService.createTable(tableName, columnFamilies);

Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("tableName", tableName);
response.put("message", success ? "创建表成功" : "创建表失败");

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("创建表失败: tableName={}", tableName, e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "创建表失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 保存用户信息
*/
@PostMapping("/user/save")
public ResponseEntity<Map<String, Object>> saveUserInfo(@RequestBody UserInfo userInfo) {
try {
boolean success = userInfoService.saveUserInfo(userInfo);

Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("userId", userInfo.getUserId());
response.put("message", success ? "保存用户信息成功" : "保存用户信息失败");

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("保存用户信息失败: userId={}", userInfo.getUserId(), e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "保存用户信息失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 获取用户信息
*/
@GetMapping("/user/{userId}")
public ResponseEntity<Map<String, Object>> getUserInfo(@PathVariable Long userId) {
try {
UserInfo userInfo = userInfoService.getUserInfo(userId);

Map<String, Object> response = new HashMap<>();
response.put("success", userInfo != null);
response.put("userInfo", userInfo);
response.put("message", userInfo != null ? "获取用户信息成功" : "用户信息不存在");

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("获取用户信息失败: userId={}", userId, e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "获取用户信息失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 获取集群状态
*/
@GetMapping("/cluster/status")
public ResponseEntity<Map<String, Object>> getClusterStatus() {
try {
ClusterStatus status = clusterService.getClusterStatus();

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("status", status);

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("获取集群状态失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "获取集群状态失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 获取表列表
*/
@GetMapping("/tables")
public ResponseEntity<Map<String, Object>> getTables() {
try {
List<TableInfo> tables = clusterService.getTables();

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("tables", tables);

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("获取表列表失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "获取表列表失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}
}

8. 总结

通过HBase分布式处理的实现,我们成功构建了一个完整的大数据存储系统。关键特性包括:

8.1 核心优势

  1. 数据存储: 海量数据的分布式存储
  2. 数据查询: 高效的数据查询和检索
  3. 数据管理: 表结构管理和数据操作
  4. 集群管理: HBase集群部署和管理
  5. 性能优化: 查询性能优化和调优

8.2 最佳实践

  1. 行键设计: 合理的行键设计策略
  2. 列族设计: 优化的列族结构设计
  3. 批量操作: 高效的批量数据操作
  4. 压缩优化: 数据压缩和存储优化
  5. 监控管理: 全面的集群监控和管理

这套HBase分布式处理方案不仅能够提供海量数据的存储能力,还包含了数据管理、集群管理、性能优化等核心功能,是现代大数据系统的重要基础设施。