1. 数字化大屏与数据平台概述

数字化大屏是现代企业数据展示的重要工具,通过实时数据采集、处理和可视化展示,为企业决策提供直观的数据支持。本文将详细介绍数据平台的架构设计、实时数据处理和大屏可视化展示的完整解决方案。

1.1 核心功能

  1. 实时数据采集: 多源数据实时采集和整合
  2. 数据处理: 数据清洗、转换和聚合
  3. 可视化展示: 图表、仪表盘和大屏展示
  4. 实时更新: WebSocket实时数据推送
  5. 平台管理: 数据源管理、用户权限控制

1.2 技术架构

1
2
3
4
5
数据源 → 数据采集 → 数据处理 → 数据存储 → 数据展示
↓ ↓ ↓ ↓ ↓
业务系统 → Kafka → 流处理 → Redis/MySQL → 大屏展示
↓ ↓ ↓ ↓ ↓
日志文件 → Flume → 批处理 → 数据仓库 → 报表分析

2. 数据平台架构设计

2.1 数据平台配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 数据平台配置
*/
@Configuration
@EnableScheduling
public class DataPlatformConfig {

@Value("${data.platform.kafka.bootstrap-servers}")
private String kafkaBootstrapServers;

@Value("${data.platform.redis.host}")
private String redisHost;

@Value("${data.platform.redis.port}")
private int redisPort;

@Value("${data.platform.websocket.endpoint}")
private String websocketEndpoint;

/**
* Kafka配置
*/
@Bean
public ProducerFactory<String, String> kafkaProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 5);
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

return new DefaultKafkaProducerFactory<>(configProps);
}

/**
* Redis配置
*/
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setHostName(redisHost);
config.setPort(redisPort);

LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
.commandTimeout(Duration.ofSeconds(2))
.build();

return new LettuceConnectionFactory(config, clientConfig);
}

/**
* WebSocket配置
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

2.2 数据采集服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/**
* 数据采集服务
*/
@Service
public class DataCollectionService {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private DataSourceService dataSourceService;

private static final String DATA_TOPIC = "data-collection";

/**
* 采集业务数据
* @param dataSource 数据源
* @param data 数据
*/
public void collectBusinessData(String dataSource, Object data) {
try {
// 创建数据消息
DataMessage message = DataMessage.builder()
.dataSource(dataSource)
.dataType("BUSINESS")
.data(JSON.toJSONString(data))
.timestamp(System.currentTimeMillis())
.build();

// 发送到Kafka
kafkaTemplate.send(DATA_TOPIC, dataSource, JSON.toJSONString(message));

log.debug("业务数据采集成功: dataSource={}, dataType={}", dataSource, "BUSINESS");

} catch (Exception e) {
log.error("业务数据采集失败: dataSource={}", dataSource, e);
}
}

/**
* 采集系统指标
* @param metrics 指标数据
*/
public void collectSystemMetrics(SystemMetrics metrics) {
try {
// 创建指标消息
DataMessage message = DataMessage.builder()
.dataSource("SYSTEM")
.dataType("METRICS")
.data(JSON.toJSONString(metrics))
.timestamp(System.currentTimeMillis())
.build();

// 发送到Kafka
kafkaTemplate.send(DATA_TOPIC, "SYSTEM", JSON.toJSONString(message));

log.debug("系统指标采集成功: metrics={}", metrics.getMetricName());

} catch (Exception e) {
log.error("系统指标采集失败: metrics={}", metrics.getMetricName(), e);
}
}

/**
* 采集用户行为数据
* @param userBehavior 用户行为
*/
public void collectUserBehavior(UserBehavior userBehavior) {
try {
// 创建用户行为消息
DataMessage message = DataMessage.builder()
.dataSource("USER")
.dataType("BEHAVIOR")
.data(JSON.toJSONString(userBehavior))
.timestamp(System.currentTimeMillis())
.build();

// 发送到Kafka
kafkaTemplate.send(DATA_TOPIC, "USER", JSON.toJSONString(message));

log.debug("用户行为数据采集成功: userId={}, action={}",
userBehavior.getUserId(), userBehavior.getAction());

} catch (Exception e) {
log.error("用户行为数据采集失败: userId={}", userBehavior.getUserId(), e);
}
}
}

/**
* 数据消息实体
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DataMessage {
private String dataSource;
private String dataType;
private String data;
private long timestamp;
}

/**
* 系统指标实体
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SystemMetrics {
private String metricName;
private double value;
private String unit;
private Map<String, String> tags;
private long timestamp;
}

/**
* 用户行为实体
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserBehavior {
private String userId;
private String action;
private String page;
private Map<String, Object> properties;
private long timestamp;
}

3. 实时数据处理

3.1 数据处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/**
* 实时数据处理器
*/
@Component
public class RealTimeDataProcessor {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private DataAggregationService dataAggregationService;

@Autowired
private WebSocketService webSocketService;

/**
* 处理数据消息
* @param message 数据消息
*/
@KafkaListener(topics = "data-collection", groupId = "data-processor-group")
public void processDataMessage(ConsumerRecord<String, String> record) {
try {
// 解析消息
DataMessage message = JSON.parseObject(record.value(), DataMessage.class);

log.debug("处理数据消息: dataSource={}, dataType={}",
message.getDataSource(), message.getDataType());

// 根据数据类型处理
switch (message.getDataType()) {
case "BUSINESS":
processBusinessData(message);
break;
case "METRICS":
processMetricsData(message);
break;
case "BEHAVIOR":
processBehaviorData(message);
break;
default:
log.warn("未知数据类型: {}", message.getDataType());
}

} catch (Exception e) {
log.error("处理数据消息失败: {}", record.value(), e);
}
}

/**
* 处理业务数据
* @param message 数据消息
*/
private void processBusinessData(DataMessage message) {
try {
// 解析业务数据
Object businessData = JSON.parseObject(message.getData(), Object.class);

// 数据清洗和转换
Object cleanedData = cleanAndTransformData(businessData);

// 存储到Redis
String key = "business:data:" + message.getDataSource() + ":" + message.getTimestamp();
redisTemplate.opsForValue().set(key, cleanedData, Duration.ofHours(24));

// 实时推送到前端
webSocketService.sendToAll("business_data", cleanedData);

// 数据聚合
dataAggregationService.aggregateBusinessData(message.getDataSource(), cleanedData);

} catch (Exception e) {
log.error("处理业务数据失败: dataSource={}", message.getDataSource(), e);
}
}

/**
* 处理指标数据
* @param message 数据消息
*/
private void processMetricsData(DataMessage message) {
try {
// 解析指标数据
SystemMetrics metrics = JSON.parseObject(message.getData(), SystemMetrics.class);

// 存储到Redis时间序列
String key = "metrics:" + metrics.getMetricName();
redisTemplate.opsForZSet().add(key, JSON.toJSONString(metrics), metrics.getTimestamp());

// 清理过期数据
redisTemplate.opsForZSet().removeRangeByScore(key, 0,
System.currentTimeMillis() - 86400000); // 保留24小时

// 实时推送到前端
webSocketService.sendToAll("system_metrics", metrics);

// 指标聚合
dataAggregationService.aggregateMetrics(metrics);

} catch (Exception e) {
log.error("处理指标数据失败: metricName={}", message.getData(), e);
}
}

/**
* 处理用户行为数据
* @param message 数据消息
*/
private void processBehaviorData(DataMessage message) {
try {
// 解析用户行为数据
UserBehavior behavior = JSON.parseObject(message.getData(), UserBehavior.class);

// 存储到Redis
String key = "user:behavior:" + behavior.getUserId();
redisTemplate.opsForList().leftPush(key, behavior);
redisTemplate.expire(key, Duration.ofDays(7)); // 保留7天

// 实时推送到前端
webSocketService.sendToAll("user_behavior", behavior);

// 行为分析
dataAggregationService.analyzeUserBehavior(behavior);

} catch (Exception e) {
log.error("处理用户行为数据失败: userId={}", message.getData(), e);
}
}

/**
* 数据清洗和转换
* @param data 原始数据
* @return 清洗后的数据
*/
private Object cleanAndTransformData(Object data) {
// 这里可以实现数据清洗和转换逻辑
// 例如:去除无效数据、格式化数据、数据验证等
return data;
}
}

3.2 数据聚合服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/**
* 数据聚合服务
*/
@Service
public class DataAggregationService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private DataStatisticsService dataStatisticsService;

/**
* 聚合业务数据
* @param dataSource 数据源
* @param data 数据
*/
public void aggregateBusinessData(String dataSource, Object data) {
try {
// 按数据源聚合
String key = "aggregation:business:" + dataSource;
redisTemplate.opsForHash().increment(key, "count", 1);
redisTemplate.expire(key, Duration.ofHours(1));

// 按时间聚合(小时级别)
String hourKey = "aggregation:hour:" + getCurrentHour();
redisTemplate.opsForHash().increment(hourKey, dataSource, 1);
redisTemplate.expire(hourKey, Duration.ofDays(1));

// 更新统计数据
dataStatisticsService.updateBusinessStatistics(dataSource);

} catch (Exception e) {
log.error("聚合业务数据失败: dataSource={}", dataSource, e);
}
}

/**
* 聚合指标数据
* @param metrics 指标数据
*/
public void aggregateMetrics(SystemMetrics metrics) {
try {
// 按指标名称聚合
String key = "aggregation:metrics:" + metrics.getMetricName();

// 计算平均值
redisTemplate.opsForHash().increment(key, "sum", metrics.getValue());
redisTemplate.opsForHash().increment(key, "count", 1);
redisTemplate.expire(key, Duration.ofHours(1));

// 计算最大值和最小值
redisTemplate.opsForHash().putIfAbsent(key, "max", metrics.getValue());
redisTemplate.opsForHash().putIfAbsent(key, "min", metrics.getValue());

Double maxValue = (Double) redisTemplate.opsForHash().get(key, "max");
Double minValue = (Double) redisTemplate.opsForHash().get(key, "min");

if (metrics.getValue() > maxValue) {
redisTemplate.opsForHash().put(key, "max", metrics.getValue());
}
if (metrics.getValue() < minValue) {
redisTemplate.opsForHash().put(key, "min", metrics.getValue());
}

// 更新指标统计
dataStatisticsService.updateMetricsStatistics(metrics);

} catch (Exception e) {
log.error("聚合指标数据失败: metricName={}", metrics.getMetricName(), e);
}
}

/**
* 分析用户行为
* @param behavior 用户行为
*/
public void analyzeUserBehavior(UserBehavior behavior) {
try {
// 按用户聚合
String userKey = "aggregation:user:" + behavior.getUserId();
redisTemplate.opsForHash().increment(userKey, behavior.getAction(), 1);
redisTemplate.expire(userKey, Duration.ofDays(7));

// 按页面聚合
String pageKey = "aggregation:page:" + behavior.getPage();
redisTemplate.opsForHash().increment(pageKey, behavior.getAction(), 1);
redisTemplate.expire(pageKey, Duration.ofDays(1));

// 按时间聚合
String timeKey = "aggregation:time:" + getCurrentHour();
redisTemplate.opsForHash().increment(timeKey, behavior.getAction(), 1);
redisTemplate.expire(timeKey, Duration.ofDays(1));

// 更新行为统计
dataStatisticsService.updateBehaviorStatistics(behavior);

} catch (Exception e) {
log.error("分析用户行为失败: userId={}", behavior.getUserId(), e);
}
}

/**
* 获取当前小时
* @return 当前小时
*/
private String getCurrentHour() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"));
}
}

4. WebSocket实时推送

4.1 WebSocket服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
/**
* WebSocket服务
*/
@Component
@ServerEndpoint("/websocket/dashboard")
public class WebSocketService {

private static final Logger log = LoggerFactory.getLogger(WebSocketService.class);

// 存储所有连接的会话
private static final Set<Session> sessions = ConcurrentHashMap.newKeySet();

/**
* 连接建立
* @param session 会话
*/
@OnOpen
public void onOpen(Session session) {
sessions.add(session);
log.info("WebSocket连接建立: sessionId={}", session.getId());

// 发送欢迎消息
sendToSession(session, "welcome", "连接成功");
}

/**
* 接收消息
* @param message 消息
* @param session 会话
*/
@OnMessage
public void onMessage(String message, Session session) {
try {
log.debug("收到WebSocket消息: sessionId={}, message={}", session.getId(), message);

// 解析消息
WebSocketMessage wsMessage = JSON.parseObject(message, WebSocketMessage.class);

// 处理消息
handleMessage(wsMessage, session);

} catch (Exception e) {
log.error("处理WebSocket消息失败: sessionId={}, message={}", session.getId(), message, e);
}
}

/**
* 连接关闭
* @param session 会话
*/
@OnClose
public void onClose(Session session) {
sessions.remove(session);
log.info("WebSocket连接关闭: sessionId={}", session.getId());
}

/**
* 连接错误
* @param session 会话
* @param error 错误
*/
@OnError
public void onError(Session session, Throwable error) {
sessions.remove(session);
log.error("WebSocket连接错误: sessionId={}", session.getId(), error);
}

/**
* 处理消息
* @param message 消息
* @param session 会话
*/
private void handleMessage(WebSocketMessage message, Session session) {
switch (message.getType()) {
case "subscribe":
handleSubscribe(message, session);
break;
case "unsubscribe":
handleUnsubscribe(message, session);
break;
case "ping":
sendToSession(session, "pong", "pong");
break;
default:
log.warn("未知消息类型: {}", message.getType());
}
}

/**
* 处理订阅
* @param message 消息
* @param session 会话
*/
private void handleSubscribe(WebSocketMessage message, Session session) {
try {
String topic = message.getData().toString();
session.getUserProperties().put("subscribed_topic", topic);

sendToSession(session, "subscribed", "订阅成功: " + topic);

} catch (Exception e) {
log.error("处理订阅失败: sessionId={}", session.getId(), e);
}
}

/**
* 处理取消订阅
* @param message 消息
* @param session 会话
*/
private void handleUnsubscribe(WebSocketMessage message, Session session) {
try {
session.getUserProperties().remove("subscribed_topic");
sendToSession(session, "unsubscribed", "取消订阅成功");

} catch (Exception e) {
log.error("处理取消订阅失败: sessionId={}", session.getId(), e);
}
}

/**
* 发送消息到指定会话
* @param session 会话
* @param type 消息类型
* @param data 数据
*/
public void sendToSession(Session session, String type, Object data) {
try {
if (session.isOpen()) {
WebSocketMessage message = new WebSocketMessage(type, data, System.currentTimeMillis());
session.getBasicRemote().sendText(JSON.toJSONString(message));
}
} catch (Exception e) {
log.error("发送WebSocket消息失败: sessionId={}", session.getId(), e);
}
}

/**
* 发送消息到所有连接
* @param type 消息类型
* @param data 数据
*/
public void sendToAll(String type, Object data) {
WebSocketMessage message = new WebSocketMessage(type, data, System.currentTimeMillis());
String messageJson = JSON.toJSONString(message);

sessions.removeIf(session -> {
try {
if (session.isOpen()) {
session.getBasicRemote().sendText(messageJson);
return false;
} else {
return true;
}
} catch (Exception e) {
log.error("发送WebSocket消息失败: sessionId={}", session.getId(), e);
return true;
}
});
}

/**
* 发送消息到订阅了指定主题的连接
* @param topic 主题
* @param type 消息类型
* @param data 数据
*/
public void sendToSubscribers(String topic, String type, Object data) {
WebSocketMessage message = new WebSocketMessage(type, data, System.currentTimeMillis());
String messageJson = JSON.toJSONString(message);

sessions.removeIf(session -> {
try {
if (session.isOpen()) {
String subscribedTopic = (String) session.getUserProperties().get("subscribed_topic");
if (topic.equals(subscribedTopic)) {
session.getBasicRemote().sendText(messageJson);
}
return false;
} else {
return true;
}
} catch (Exception e) {
log.error("发送WebSocket消息失败: sessionId={}", session.getId(), e);
return true;
}
});
}
}

/**
* WebSocket消息实体
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WebSocketMessage {
private String type;
private Object data;
private long timestamp;
}

5. 数据可视化API

5.1 数据展示控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* 数据展示控制器
*/
@RestController
@RequestMapping("/api/dashboard")
public class DashboardController {

@Autowired
private DataStatisticsService dataStatisticsService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 获取实时统计数据
*/
@GetMapping("/realtime")
public ResponseEntity<Map<String, Object>> getRealtimeData() {
try {
Map<String, Object> data = new HashMap<>();

// 获取业务数据统计
data.put("businessStats", dataStatisticsService.getBusinessStatistics());

// 获取系统指标
data.put("systemMetrics", dataStatisticsService.getSystemMetrics());

// 获取用户行为统计
data.put("userBehavior", dataStatisticsService.getUserBehaviorStatistics());

return ResponseEntity.ok(data);

} catch (Exception e) {
log.error("获取实时数据失败", e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(Collections.singletonMap("error", e.getMessage()));
}
}

/**
* 获取历史数据
*/
@GetMapping("/history")
public ResponseEntity<List<Map<String, Object>>> getHistoryData(
@RequestParam String dataType,
@RequestParam String timeRange) {
try {
List<Map<String, Object>> data = dataStatisticsService.getHistoryData(dataType, timeRange);
return ResponseEntity.ok(data);

} catch (Exception e) {
log.error("获取历史数据失败: dataType={}, timeRange={}", dataType, timeRange, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(Collections.emptyList());
}
}

/**
* 获取图表数据
*/
@GetMapping("/charts")
public ResponseEntity<Map<String, Object>> getChartData(
@RequestParam String chartType,
@RequestParam(required = false) String timeRange) {
try {
Map<String, Object> data = dataStatisticsService.getChartData(chartType, timeRange);
return ResponseEntity.ok(data);

} catch (Exception e) {
log.error("获取图表数据失败: chartType={}, timeRange={}", chartType, timeRange, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(Collections.singletonMap("error", e.getMessage()));
}
}
}

5.2 数据统计服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
/**
* 数据统计服务
*/
@Service
public class DataStatisticsService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 获取业务数据统计
* @return 业务统计
*/
public Map<String, Object> getBusinessStatistics() {
Map<String, Object> stats = new HashMap<>();

try {
// 获取各数据源的统计
Set<String> keys = redisTemplate.keys("aggregation:business:*");
Map<String, Long> dataSourceStats = new HashMap<>();

if (keys != null) {
for (String key : keys) {
String dataSource = key.substring(key.lastIndexOf(":") + 1);
Object count = redisTemplate.opsForHash().get(key, "count");
dataSourceStats.put(dataSource, count != null ? Long.valueOf(count.toString()) : 0L);
}
}

stats.put("dataSourceStats", dataSourceStats);
stats.put("totalCount", dataSourceStats.values().stream().mapToLong(Long::longValue).sum());

} catch (Exception e) {
log.error("获取业务数据统计失败", e);
}

return stats;
}

/**
* 获取系统指标
* @return 系统指标
*/
public Map<String, Object> getSystemMetrics() {
Map<String, Object> metrics = new HashMap<>();

try {
// 获取各指标的统计
Set<String> keys = redisTemplate.keys("aggregation:metrics:*");
Map<String, Map<String, Object>> metricStats = new HashMap<>();

if (keys != null) {
for (String key : keys) {
String metricName = key.substring(key.lastIndexOf(":") + 1);
Map<Object, Object> rawStats = redisTemplate.opsForHash().entries(key);

Map<String, Object> stats = new HashMap<>();
stats.put("sum", rawStats.get("sum"));
stats.put("count", rawStats.get("count"));
stats.put("max", rawStats.get("max"));
stats.put("min", rawStats.get("min"));

// 计算平均值
Double sum = (Double) rawStats.get("sum");
Long count = (Long) rawStats.get("count");
if (sum != null && count != null && count > 0) {
stats.put("avg", sum / count);
}

metricStats.put(metricName, stats);
}
}

metrics.put("metricStats", metricStats);

} catch (Exception e) {
log.error("获取系统指标失败", e);
}

return metrics;
}

/**
* 获取用户行为统计
* @return 用户行为统计
*/
public Map<String, Object> getUserBehaviorStatistics() {
Map<String, Object> stats = new HashMap<>();

try {
// 获取页面访问统计
Set<String> pageKeys = redisTemplate.keys("aggregation:page:*");
Map<String, Map<String, Long>> pageStats = new HashMap<>();

if (pageKeys != null) {
for (String key : pageKeys) {
String page = key.substring(key.lastIndexOf(":") + 1);
Map<Object, Object> rawStats = redisTemplate.opsForHash().entries(key);

Map<String, Long> pageData = new HashMap<>();
rawStats.forEach((k, v) -> pageData.put(k.toString(), Long.valueOf(v.toString())));

pageStats.put(page, pageData);
}
}

stats.put("pageStats", pageStats);

// 获取时间分布统计
Set<String> timeKeys = redisTemplate.keys("aggregation:time:*");
Map<String, Map<String, Long>> timeStats = new HashMap<>();

if (timeKeys != null) {
for (String key : timeKeys) {
String time = key.substring(key.lastIndexOf(":") + 1);
Map<Object, Object> rawStats = redisTemplate.opsForHash().entries(key);

Map<String, Long> timeData = new HashMap<>();
rawStats.forEach((k, v) -> timeData.put(k.toString(), Long.valueOf(v.toString())));

timeStats.put(time, timeData);
}
}

stats.put("timeStats", timeStats);

} catch (Exception e) {
log.error("获取用户行为统计失败", e);
}

return stats;
}

/**
* 获取历史数据
* @param dataType 数据类型
* @param timeRange 时间范围
* @return 历史数据
*/
public List<Map<String, Object>> getHistoryData(String dataType, String timeRange) {
List<Map<String, Object>> data = new ArrayList<>();

try {
// 根据数据类型和时间范围获取历史数据
// 这里可以实现具体的历史数据查询逻辑

} catch (Exception e) {
log.error("获取历史数据失败: dataType={}, timeRange={}", dataType, timeRange, e);
}

return data;
}

/**
* 获取图表数据
* @param chartType 图表类型
* @param timeRange 时间范围
* @return 图表数据
*/
public Map<String, Object> getChartData(String chartType, String timeRange) {
Map<String, Object> data = new HashMap<>();

try {
switch (chartType) {
case "line":
data = getLineChartData(timeRange);
break;
case "bar":
data = getBarChartData(timeRange);
break;
case "pie":
data = getPieChartData(timeRange);
break;
default:
log.warn("未知图表类型: {}", chartType);
}

} catch (Exception e) {
log.error("获取图表数据失败: chartType={}, timeRange={}", chartType, timeRange, e);
}

return data;
}

/**
* 获取折线图数据
*/
private Map<String, Object> getLineChartData(String timeRange) {
Map<String, Object> data = new HashMap<>();
// 实现折线图数据获取逻辑
return data;
}

/**
* 获取柱状图数据
*/
private Map<String, Object> getBarChartData(String timeRange) {
Map<String, Object> data = new HashMap<>();
// 实现柱状图数据获取逻辑
return data;
}

/**
* 获取饼图数据
*/
private Map<String, Object> getPieChartData(String timeRange) {
Map<String, Object> data = new HashMap<>();
// 实现饼图数据获取逻辑
return data;
}
}

6. 总结

通过数字化大屏与数据平台的构建,我们成功实现了一个完整的实时数据可视化系统。关键特性包括:

6.1 核心优势

  1. 实时数据采集: 多源数据实时采集和整合
  2. 数据处理: 高效的数据清洗、转换和聚合
  3. 可视化展示: 丰富的图表和大屏展示
  4. 实时推送: WebSocket实时数据推送
  5. 平台管理: 完善的数据源管理和权限控制

6.2 最佳实践

  1. 数据架构: 合理的数据分层和存储策略
  2. 实时处理: 高效的流式数据处理
  3. 可视化设计: 直观的数据展示和交互
  4. 性能优化: 缓存和索引优化
  5. 监控告警: 完善的系统监控和异常处理

这套数字化大屏方案不仅能够提供直观的数据展示,还为业务决策提供了强有力的数据支持,是现代企业数字化转型的重要工具。