1. MinIO断点续传功能概述

MinIO是一个高性能的对象存储服务,支持S3兼容的API。断点续传功能是处理大文件上传的重要特性,通过分片上传和断点续传机制,可以显著提高大文件上传的成功率和用户体验。本文将详细介绍MinIO的配置、分片上传、断点续传和文件管理的完整实现。

1.1 核心功能

  1. 分片上传: 大文件分片上传和合并
  2. 断点续传: 上传中断后从断点继续
  3. 进度跟踪: 实时上传进度监控
  4. 文件管理: 文件列表、删除、预览
  5. 错误处理: 完善的错误处理和重试机制

1.2 技术架构

1
2
3
4
5
文件上传 → 分片处理 → MinIO存储 → 分片合并
↓ ↓ ↓ ↓
前端界面 → 进度跟踪 → 断点续传 → 完成通知
↓ ↓ ↓ ↓
文件管理 → 状态监控 → 错误处理 → 结果返回

2. MinIO配置与初始化

2.1 MinIO配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* MinIO配置类
*/
@Configuration
public class MinIOConfig {

@Value("${minio.endpoint}")
private String endpoint;

@Value("${minio.access-key}")
private String accessKey;

@Value("${minio.secret-key}")
private String secretKey;

@Value("${minio.bucket-name}")
private String bucketName;

@Value("${minio.chunk-size}")
private long chunkSize;

@Value("${minio.max-file-size}")
private long maxFileSize;

/**
* MinIO客户端
*/
@Bean
public MinioClient minioClient() {
return MinioClient.builder()
.endpoint(endpoint)
.credentials(accessKey, secretKey)
.build();
}

/**
* MinIO配置属性
*/
@Bean
public MinIOProperties minioProperties() {
return MinIOProperties.builder()
.endpoint(endpoint)
.accessKey(accessKey)
.secretKey(secretKey)
.bucketName(bucketName)
.chunkSize(chunkSize)
.maxFileSize(maxFileSize)
.build();
}

/**
* 初始化MinIO
*/
@PostConstruct
public void initMinIO() {
try {
MinioClient client = minioClient();

// 检查存储桶是否存在
boolean bucketExists = client.bucketExists(BucketExistsArgs.builder()
.bucket(bucketName)
.build());

if (!bucketExists) {
// 创建存储桶
client.makeBucket(MakeBucketArgs.builder()
.bucket(bucketName)
.build());

log.info("创建MinIO存储桶成功: {}", bucketName);
} else {
log.info("MinIO存储桶已存在: {}", bucketName);
}

} catch (Exception e) {
log.error("初始化MinIO失败", e);
throw new RuntimeException("初始化MinIO失败", e);
}
}
}

/**
* MinIO配置属性
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MinIOProperties {
private String endpoint;
private String accessKey;
private String secretKey;
private String bucketName;
private long chunkSize;
private long maxFileSize;
}

2.2 应用配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# application.yml
minio:
endpoint: http://localhost:9000
access-key: minioadmin
secret-key: minioadmin
bucket-name: uploads
chunk-size: 5242880 # 5MB
max-file-size: 1073741824 # 1GB

# 文件上传配置
file:
upload:
temp-path: /tmp/uploads/
max-chunks: 1000
chunk-expire-time: 86400 # 24小时

3. 分片上传服务

3.1 分片上传服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
/**
* 分片上传服务
*/
@Service
public class ChunkUploadService {

@Autowired
private MinioClient minioClient;

@Autowired
private MinIOProperties minioProperties;

@Autowired
private UploadProgressService progressService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 初始化分片上传
* @param fileName 文件名
* @param fileSize 文件大小
* @param chunkCount 分片数量
* @return 上传ID
*/
public String initChunkUpload(String fileName, long fileSize, int chunkCount) {
try {
// 1. 验证文件大小
if (fileSize > minioProperties.getMaxFileSize()) {
throw new BusinessException("文件大小超过限制");
}

// 2. 生成上传ID
String uploadId = generateUploadId(fileName);

// 3. 创建上传记录
ChunkUploadInfo uploadInfo = ChunkUploadInfo.builder()
.uploadId(uploadId)
.fileName(fileName)
.fileSize(fileSize)
.chunkCount(chunkCount)
.uploadedChunks(new HashSet<>())
.status("INITIALIZED")
.createTime(LocalDateTime.now())
.build();

// 4. 保存到Redis
String key = "chunk:upload:" + uploadId;
redisTemplate.opsForValue().set(key, uploadInfo, Duration.ofHours(24));

// 5. 初始化进度
progressService.initProgress(uploadId, chunkCount);

log.info("初始化分片上传成功: uploadId={}, fileName={}, chunkCount={}",
uploadId, fileName, chunkCount);

return uploadId;

} catch (Exception e) {
log.error("初始化分片上传失败: fileName={}", fileName, e);
throw new BusinessException("初始化分片上传失败: " + e.getMessage());
}
}

/**
* 上传分片
* @param uploadId 上传ID
* @param chunkNumber 分片编号
* @param chunkData 分片数据
* @return 上传结果
*/
public ChunkUploadResult uploadChunk(String uploadId, int chunkNumber, byte[] chunkData) {
try {
// 1. 获取上传信息
ChunkUploadInfo uploadInfo = getUploadInfo(uploadId);
if (uploadInfo == null) {
throw new BusinessException("上传信息不存在");
}

// 2. 检查分片是否已上传
if (uploadInfo.getUploadedChunks().contains(chunkNumber)) {
return ChunkUploadResult.success("分片已存在");
}

// 3. 上传分片到MinIO
String objectName = uploadId + "/" + chunkNumber;

minioClient.putObject(PutObjectArgs.builder()
.bucket(minioProperties.getBucketName())
.object(objectName)
.stream(new ByteArrayInputStream(chunkData), chunkData.length, -1)
.build());

// 4. 更新上传信息
uploadInfo.getUploadedChunks().add(chunkNumber);
uploadInfo.setUpdateTime(LocalDateTime.now());

String key = "chunk:upload:" + uploadId;
redisTemplate.opsForValue().set(key, uploadInfo, Duration.ofHours(24));

// 5. 更新进度
progressService.updateProgress(uploadId, chunkNumber);

// 6. 检查是否完成
if (uploadInfo.getUploadedChunks().size() == uploadInfo.getChunkCount()) {
// 触发分片合并
CompletableFuture.runAsync(() -> mergeChunks(uploadId));
}

log.debug("分片上传成功: uploadId={}, chunkNumber={}", uploadId, chunkNumber);

return ChunkUploadResult.success("分片上传成功");

} catch (Exception e) {
log.error("分片上传失败: uploadId={}, chunkNumber={}", uploadId, chunkNumber, e);
return ChunkUploadResult.error("分片上传失败: " + e.getMessage());
}
}

/**
* 合并分片
* @param uploadId 上传ID
* @return 合并结果
*/
public ChunkMergeResult mergeChunks(String uploadId) {
try {
// 1. 获取上传信息
ChunkUploadInfo uploadInfo = getUploadInfo(uploadId);
if (uploadInfo == null) {
throw new BusinessException("上传信息不存在");
}

// 2. 检查所有分片是否已上传
if (uploadInfo.getUploadedChunks().size() != uploadInfo.getChunkCount()) {
throw new BusinessException("分片未完全上传");
}

// 3. 更新状态为合并中
uploadInfo.setStatus("MERGING");
updateUploadInfo(uploadInfo);

// 4. 创建合并后的对象
String finalObjectName = uploadInfo.getFileName();
List<ComposeSource> sources = new ArrayList<>();

for (int i = 0; i < uploadInfo.getChunkCount(); i++) {
String chunkObjectName = uploadId + "/" + i;
sources.add(ComposeSource.builder()
.bucket(minioProperties.getBucketName())
.object(chunkObjectName)
.build());
}

// 5. 执行合并
minioClient.composeObject(ComposeObjectArgs.builder()
.bucket(minioProperties.getBucketName())
.object(finalObjectName)
.sources(sources)
.build());

// 6. 删除分片文件
deleteChunkFiles(uploadId, uploadInfo.getChunkCount());

// 7. 更新状态为完成
uploadInfo.setStatus("COMPLETED");
uploadInfo.setFinalObjectName(finalObjectName);
uploadInfo.setCompleteTime(LocalDateTime.now());
updateUploadInfo(uploadInfo);

// 8. 更新进度为完成
progressService.completeProgress(uploadId);

log.info("分片合并完成: uploadId={}, fileName={}", uploadId, uploadInfo.getFileName());

return ChunkMergeResult.success(finalObjectName);

} catch (Exception e) {
log.error("分片合并失败: uploadId={}", uploadId, e);

// 更新状态为失败
ChunkUploadInfo uploadInfo = getUploadInfo(uploadId);
if (uploadInfo != null) {
uploadInfo.setStatus("FAILED");
uploadInfo.setErrorMessage(e.getMessage());
updateUploadInfo(uploadInfo);
}

return ChunkMergeResult.error("分片合并失败: " + e.getMessage());
}
}

/**
* 获取上传信息
* @param uploadId 上传ID
* @return 上传信息
*/
public ChunkUploadInfo getUploadInfo(String uploadId) {
String key = "chunk:upload:" + uploadId;
return (ChunkUploadInfo) redisTemplate.opsForValue().get(key);
}

/**
* 更新上传信息
* @param uploadInfo 上传信息
*/
private void updateUploadInfo(ChunkUploadInfo uploadInfo) {
String key = "chunk:upload:" + uploadInfo.getUploadId();
redisTemplate.opsForValue().set(key, uploadInfo, Duration.ofHours(24));
}

/**
* 删除分片文件
* @param uploadId 上传ID
* @param chunkCount 分片数量
*/
private void deleteChunkFiles(String uploadId, int chunkCount) {
try {
List<DeleteObject> objects = new ArrayList<>();

for (int i = 0; i < chunkCount; i++) {
String chunkObjectName = uploadId + "/" + i;
objects.add(new DeleteObject(chunkObjectName));
}

Iterable<Result<DeleteError>> results = minioClient.removeObjects(
RemoveObjectsArgs.builder()
.bucket(minioProperties.getBucketName())
.objects(objects)
.build());

// 检查删除结果
for (Result<DeleteError> result : results) {
DeleteError error = result.get();
if (error != null) {
log.warn("删除分片文件失败: {}", error.objectName());
}
}

} catch (Exception e) {
log.error("删除分片文件失败: uploadId={}", uploadId, e);
}
}

/**
* 生成上传ID
* @param fileName 文件名
* @return 上传ID
*/
private String generateUploadId(String fileName) {
String timestamp = String.valueOf(System.currentTimeMillis());
String uuid = UUID.randomUUID().toString().substring(0, 8);
return fileName + "_" + timestamp + "_" + uuid;
}
}

/**
* 分片上传信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ChunkUploadInfo {
private String uploadId;
private String fileName;
private long fileSize;
private int chunkCount;
private Set<Integer> uploadedChunks;
private String status;
private String finalObjectName;
private String errorMessage;
private LocalDateTime createTime;
private LocalDateTime updateTime;
private LocalDateTime completeTime;
}

/**
* 分片上传结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ChunkUploadResult {
private boolean success;
private String message;
private int uploadedChunks;
private int totalChunks;

public static ChunkUploadResult success(String message) {
return ChunkUploadResult.builder()
.success(true)
.message(message)
.build();
}

public static ChunkUploadResult error(String message) {
return ChunkUploadResult.builder()
.success(false)
.message(message)
.build();
}
}

/**
* 分片合并结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ChunkMergeResult {
private boolean success;
private String objectName;
private String message;

public static ChunkMergeResult success(String objectName) {
return ChunkMergeResult.builder()
.success(true)
.objectName(objectName)
.build();
}

public static ChunkMergeResult error(String message) {
return ChunkMergeResult.builder()
.success(false)
.message(message)
.build();
}
}

4. 断点续传服务

4.1 断点续传服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
/**
* 断点续传服务
*/
@Service
public class ResumeUploadService {

@Autowired
private ChunkUploadService chunkUploadService;

@Autowired
private UploadProgressService progressService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 检查上传状态
* @param uploadId 上传ID
* @return 上传状态
*/
public UploadStatus checkUploadStatus(String uploadId) {
try {
ChunkUploadInfo uploadInfo = chunkUploadService.getUploadInfo(uploadId);
if (uploadInfo == null) {
return UploadStatus.notFound();
}

UploadStatus status = new UploadStatus();
status.setUploadId(uploadId);
status.setFileName(uploadInfo.getFileName());
status.setFileSize(uploadInfo.getFileSize());
status.setChunkCount(uploadInfo.getChunkCount());
status.setUploadedChunks(uploadInfo.getUploadedChunks());
status.setStatus(uploadInfo.getStatus());
status.setProgress(calculateProgress(uploadInfo));

return status;

} catch (Exception e) {
log.error("检查上传状态失败: uploadId={}", uploadId, e);
return UploadStatus.error("检查上传状态失败: " + e.getMessage());
}
}

/**
* 恢复上传
* @param uploadId 上传ID
* @return 恢复结果
*/
public ResumeResult resumeUpload(String uploadId) {
try {
ChunkUploadInfo uploadInfo = chunkUploadService.getUploadInfo(uploadId);
if (uploadInfo == null) {
return ResumeResult.error("上传信息不存在");
}

if ("COMPLETED".equals(uploadInfo.getStatus())) {
return ResumeResult.success("上传已完成");
}

if ("FAILED".equals(uploadInfo.getStatus())) {
return ResumeResult.error("上传已失败: " + uploadInfo.getErrorMessage());
}

// 检查哪些分片需要重新上传
List<Integer> missingChunks = findMissingChunks(uploadInfo);

ResumeResult result = new ResumeResult();
result.setSuccess(true);
result.setUploadId(uploadId);
result.setMissingChunks(missingChunks);
result.setMessage("可以继续上传");

log.info("恢复上传成功: uploadId={}, missingChunks={}", uploadId, missingChunks.size());

return result;

} catch (Exception e) {
log.error("恢复上传失败: uploadId={}", uploadId, e);
return ResumeResult.error("恢复上传失败: " + e.getMessage());
}
}

/**
* 取消上传
* @param uploadId 上传ID
* @return 取消结果
*/
public CancelResult cancelUpload(String uploadId) {
try {
ChunkUploadInfo uploadInfo = chunkUploadService.getUploadInfo(uploadId);
if (uploadInfo == null) {
return CancelResult.error("上传信息不存在");
}

// 更新状态为取消
uploadInfo.setStatus("CANCELLED");
uploadInfo.setUpdateTime(LocalDateTime.now());

String key = "chunk:upload:" + uploadId;
redisTemplate.opsForValue().set(key, uploadInfo, Duration.ofHours(1));

// 删除已上传的分片
deleteUploadedChunks(uploadId, uploadInfo.getUploadedChunks());

// 清除进度
progressService.clearProgress(uploadId);

log.info("取消上传成功: uploadId={}", uploadId);

return CancelResult.success("上传已取消");

} catch (Exception e) {
log.error("取消上传失败: uploadId={}", uploadId, e);
return CancelResult.error("取消上传失败: " + e.getMessage());
}
}

/**
* 查找缺失的分片
* @param uploadInfo 上传信息
* @return 缺失的分片列表
*/
private List<Integer> findMissingChunks(ChunkUploadInfo uploadInfo) {
List<Integer> missingChunks = new ArrayList<>();

for (int i = 0; i < uploadInfo.getChunkCount(); i++) {
if (!uploadInfo.getUploadedChunks().contains(i)) {
missingChunks.add(i);
}
}

return missingChunks;
}

/**
* 计算上传进度
* @param uploadInfo 上传信息
* @return 进度百分比
*/
private double calculateProgress(ChunkUploadInfo uploadInfo) {
if (uploadInfo.getChunkCount() == 0) {
return 0.0;
}

return (double) uploadInfo.getUploadedChunks().size() / uploadInfo.getChunkCount() * 100;
}

/**
* 删除已上传的分片
* @param uploadId 上传ID
* @param uploadedChunks 已上传的分片
*/
private void deleteUploadedChunks(String uploadId, Set<Integer> uploadedChunks) {
try {
// 这里可以实现删除MinIO中已上传分片的逻辑
log.debug("删除已上传分片: uploadId={}, count={}", uploadId, uploadedChunks.size());

} catch (Exception e) {
log.error("删除已上传分片失败: uploadId={}", uploadId, e);
}
}
}

/**
* 上传状态
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UploadStatus {
private String uploadId;
private String fileName;
private long fileSize;
private int chunkCount;
private Set<Integer> uploadedChunks;
private String status;
private double progress;
private String message;

public static UploadStatus notFound() {
return UploadStatus.builder()
.status("NOT_FOUND")
.message("上传信息不存在")
.build();
}

public static UploadStatus error(String message) {
return UploadStatus.builder()
.status("ERROR")
.message(message)
.build();
}
}

/**
* 恢复结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ResumeResult {
private boolean success;
private String uploadId;
private List<Integer> missingChunks;
private String message;

public static ResumeResult success(String message) {
return ResumeResult.builder()
.success(true)
.message(message)
.build();
}

public static ResumeResult error(String message) {
return ResumeResult.builder()
.success(false)
.message(message)
.build();
}
}

/**
* 取消结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CancelResult {
private boolean success;
private String message;

public static CancelResult success(String message) {
return CancelResult.builder()
.success(true)
.message(message)
.build();
}

public static CancelResult error(String message) {
return CancelResult.builder()
.success(false)
.message(message)
.build();
}
}

5. 上传进度服务

5.1 上传进度服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
/**
* 上传进度服务
*/
@Service
public class UploadProgressService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private SimpMessagingTemplate messagingTemplate;

/**
* 初始化进度
* @param uploadId 上传ID
* @param totalChunks 总分片数
*/
public void initProgress(String uploadId, int totalChunks) {
try {
UploadProgress progress = UploadProgress.builder()
.uploadId(uploadId)
.totalChunks(totalChunks)
.uploadedChunks(0)
.progress(0.0)
.status("UPLOADING")
.startTime(LocalDateTime.now())
.build();

String key = "upload:progress:" + uploadId;
redisTemplate.opsForValue().set(key, progress, Duration.ofHours(24));

// 发送进度通知
sendProgressNotification(uploadId, progress);

} catch (Exception e) {
log.error("初始化进度失败: uploadId={}", uploadId, e);
}
}

/**
* 更新进度
* @param uploadId 上传ID
* @param chunkNumber 分片编号
*/
public void updateProgress(String uploadId, int chunkNumber) {
try {
String key = "upload:progress:" + uploadId;
UploadProgress progress = (UploadProgress) redisTemplate.opsForValue().get(key);

if (progress != null) {
progress.setUploadedChunks(progress.getUploadedChunks() + 1);
progress.setProgress((double) progress.getUploadedChunks() / progress.getTotalChunks() * 100);
progress.setUpdateTime(LocalDateTime.now());

redisTemplate.opsForValue().set(key, progress, Duration.ofHours(24));

// 发送进度通知
sendProgressNotification(uploadId, progress);
}

} catch (Exception e) {
log.error("更新进度失败: uploadId={}, chunkNumber={}", uploadId, chunkNumber, e);
}
}

/**
* 完成进度
* @param uploadId 上传ID
*/
public void completeProgress(String uploadId) {
try {
String key = "upload:progress:" + uploadId;
UploadProgress progress = (UploadProgress) redisTemplate.opsForValue().get(key);

if (progress != null) {
progress.setStatus("COMPLETED");
progress.setProgress(100.0);
progress.setCompleteTime(LocalDateTime.now());

redisTemplate.opsForValue().set(key, progress, Duration.ofHours(1));

// 发送完成通知
sendProgressNotification(uploadId, progress);
}

} catch (Exception e) {
log.error("完成进度失败: uploadId={}", uploadId, e);
}
}

/**
* 失败进度
* @param uploadId 上传ID
* @param errorMessage 错误消息
*/
public void failProgress(String uploadId, String errorMessage) {
try {
String key = "upload:progress:" + uploadId;
UploadProgress progress = (UploadProgress) redisTemplate.opsForValue().get(key);

if (progress != null) {
progress.setStatus("FAILED");
progress.setErrorMessage(errorMessage);
progress.setFailTime(LocalDateTime.now());

redisTemplate.opsForValue().set(key, progress, Duration.ofHours(1));

// 发送失败通知
sendProgressNotification(uploadId, progress);
}

} catch (Exception e) {
log.error("失败进度失败: uploadId={}", uploadId, e);
}
}

/**
* 清除进度
* @param uploadId 上传ID
*/
public void clearProgress(String uploadId) {
try {
String key = "upload:progress:" + uploadId;
redisTemplate.delete(key);

} catch (Exception e) {
log.error("清除进度失败: uploadId={}", uploadId, e);
}
}

/**
* 获取进度
* @param uploadId 上传ID
* @return 进度信息
*/
public UploadProgress getProgress(String uploadId) {
try {
String key = "upload:progress:" + uploadId;
return (UploadProgress) redisTemplate.opsForValue().get(key);

} catch (Exception e) {
log.error("获取进度失败: uploadId={}", uploadId, e);
return null;
}
}

/**
* 发送进度通知
* @param uploadId 上传ID
* @param progress 进度信息
*/
private void sendProgressNotification(String uploadId, UploadProgress progress) {
try {
ProgressNotification notification = ProgressNotification.builder()
.uploadId(uploadId)
.progress(progress.getProgress())
.uploadedChunks(progress.getUploadedChunks())
.totalChunks(progress.getTotalChunks())
.status(progress.getStatus())
.timestamp(LocalDateTime.now())
.build();

messagingTemplate.convertAndSend("/topic/upload-progress/" + uploadId, notification);

} catch (Exception e) {
log.error("发送进度通知失败: uploadId={}", uploadId, e);
}
}
}

/**
* 上传进度
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UploadProgress {
private String uploadId;
private int totalChunks;
private int uploadedChunks;
private double progress;
private String status;
private String errorMessage;
private LocalDateTime startTime;
private LocalDateTime updateTime;
private LocalDateTime completeTime;
private LocalDateTime failTime;
}

/**
* 进度通知
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ProgressNotification {
private String uploadId;
private double progress;
private int uploadedChunks;
private int totalChunks;
private String status;
private LocalDateTime timestamp;
}

6. 文件管理控制器

6.1 文件上传控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/**
* 文件上传控制器
*/
@RestController
@RequestMapping("/api/upload")
public class FileUploadController {

@Autowired
private ChunkUploadService chunkUploadService;

@Autowired
private ResumeUploadService resumeUploadService;

@Autowired
private UploadProgressService progressService;

/**
* 初始化分片上传
*/
@PostMapping("/init")
public ResponseEntity<Map<String, Object>> initChunkUpload(
@RequestParam String fileName,
@RequestParam long fileSize,
@RequestParam int chunkCount) {

try {
String uploadId = chunkUploadService.initChunkUpload(fileName, fileSize, chunkCount);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("uploadId", uploadId);
result.put("chunkSize", 5242880); // 5MB

return ResponseEntity.ok(result);

} catch (Exception e) {
log.error("初始化分片上传失败: fileName={}", fileName, e);

Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("message", e.getMessage());

return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(result);
}
}

/**
* 上传分片
*/
@PostMapping("/chunk")
public ResponseEntity<Map<String, Object>> uploadChunk(
@RequestParam String uploadId,
@RequestParam int chunkNumber,
@RequestParam("file") MultipartFile file) {

try {
byte[] chunkData = file.getBytes();
ChunkUploadResult result = chunkUploadService.uploadChunk(uploadId, chunkNumber, chunkData);

Map<String, Object> response = new HashMap<>();
response.put("success", result.isSuccess());
response.put("message", result.getMessage());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("上传分片失败: uploadId={}, chunkNumber={}", uploadId, chunkNumber, e);

Map<String, Object> result = new HashMap<>();
result.put("success", false);
result.put("message", e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result);
}
}

/**
* 检查上传状态
*/
@GetMapping("/status/{uploadId}")
public ResponseEntity<UploadStatus> checkUploadStatus(@PathVariable String uploadId) {
try {
UploadStatus status = resumeUploadService.checkUploadStatus(uploadId);
return ResponseEntity.ok(status);

} catch (Exception e) {
log.error("检查上传状态失败: uploadId={}", uploadId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null);
}
}

/**
* 恢复上传
*/
@PostMapping("/resume/{uploadId}")
public ResponseEntity<ResumeResult> resumeUpload(@PathVariable String uploadId) {
try {
ResumeResult result = resumeUploadService.resumeUpload(uploadId);
return ResponseEntity.ok(result);

} catch (Exception e) {
log.error("恢复上传失败: uploadId={}", uploadId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null);
}
}

/**
* 取消上传
*/
@PostMapping("/cancel/{uploadId}")
public ResponseEntity<CancelResult> cancelUpload(@PathVariable String uploadId) {
try {
CancelResult result = resumeUploadService.cancelUpload(uploadId);
return ResponseEntity.ok(result);

} catch (Exception e) {
log.error("取消上传失败: uploadId={}", uploadId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null);
}
}

/**
* 获取上传进度
*/
@GetMapping("/progress/{uploadId}")
public ResponseEntity<UploadProgress> getUploadProgress(@PathVariable String uploadId) {
try {
UploadProgress progress = progressService.getProgress(uploadId);
if (progress != null) {
return ResponseEntity.ok(progress);
} else {
return ResponseEntity.notFound().build();
}

} catch (Exception e) {
log.error("获取上传进度失败: uploadId={}", uploadId, e);
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(null);
}
}
}

7. WebSocket进度推送

7.1 WebSocket配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* WebSocket配置
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
}
}

/**
* WebSocket控制器
*/
@Controller
public class WebSocketController {

@MessageMapping("/upload/progress")
@SendTo("/topic/upload-progress")
public ProgressNotification handleProgressMessage(ProgressRequest request) {
// 处理进度消息
return ProgressNotification.builder()
.uploadId(request.getUploadId())
.progress(request.getProgress())
.status(request.getStatus())
.timestamp(LocalDateTime.now())
.build();
}
}

/**
* 进度请求
*/
@Data
public class ProgressRequest {
private String uploadId;
private double progress;
private String status;
}

8. 总结

通过MinIO断点续传功能的实现,我们成功构建了一个完整的大文件上传系统。关键特性包括:

8.1 核心优势

  1. 分片上传: 大文件分片上传和合并
  2. 断点续传: 上传中断后从断点继续
  3. 进度跟踪: 实时上传进度监控
  4. 文件管理: 文件列表、删除、预览
  5. 错误处理: 完善的错误处理和重试机制

8.2 最佳实践

  1. 分片策略: 合理的分片大小和数量
  2. 断点续传: 可靠的状态管理和恢复机制
  3. 进度监控: 实时进度推送和状态跟踪
  4. 错误处理: 完善的异常处理和重试策略
  5. 性能优化: 并发上传和资源管理

这套MinIO断点续传方案不仅能够处理大文件上传,还提供了完善的断点续传和进度监控功能,是现代文件上传系统的重要基础设施。