1. 跨实例重复执行问题概述

在分布式系统中,多个服务实例同时执行相同的任务是一个常见问题。这种重复执行不仅会浪费系统资源,还可能导致数据不一致、业务逻辑错误等严重后果。本文将详细介绍基于分布式锁、看门狗续约和TTL兜底的完整解决方案。

1.1 问题场景

  1. 定时任务重复执行: 多个实例同时执行定时任务
  2. 消息重复消费: 消息队列的重复消费问题
  3. 数据同步重复: 数据同步过程中的重复操作
  4. 业务逻辑重复: 关键业务逻辑的重复执行

1.2 解决方案

1
2
3
业务请求 → 分布式锁获取 → 看门狗续约 → 业务执行 → 锁释放
↓ ↓ ↓ ↓ ↓
多实例竞争 → 锁竞争机制 → 自动续约 → 单实例执行 → TTL兜底

2. Redis分布式锁实现

2.1 基础分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/**
* Redis分布式锁实现
*/
@Component
public class RedisDistributedLock {

@Autowired
private RedisTemplate<String, String> redisTemplate;

private static final String LOCK_PREFIX = "distributed:lock:";
private static final String UNLOCK_SCRIPT =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

/**
* 尝试获取锁
* @param lockKey 锁的键
* @param lockValue 锁的值(用于标识锁的拥有者)
* @param expireTime 过期时间(秒)
* @return 是否获取成功
*/
public boolean tryLock(String lockKey, String lockValue, long expireTime) {
try {
String key = LOCK_PREFIX + lockKey;
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, lockValue, Duration.ofSeconds(expireTime));

if (success != null && success) {
log.info("获取锁成功: lockKey={}, lockValue={}, expireTime={}s",
lockKey, lockValue, expireTime);
return true;
} else {
log.debug("获取锁失败: lockKey={}, lockValue={}", lockKey, lockValue);
return false;
}
} catch (Exception e) {
log.error("获取锁异常: lockKey={}, lockValue={}", lockKey, lockValue, e);
return false;
}
}

/**
* 释放锁
* @param lockKey 锁的键
* @param lockValue 锁的值
* @return 是否释放成功
*/
public boolean releaseLock(String lockKey, String lockValue) {
try {
String key = LOCK_PREFIX + lockKey;

// 使用Lua脚本确保原子性
DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(UNLOCK_SCRIPT);
script.setResultType(Long.class);

Long result = redisTemplate.execute(script,
Collections.singletonList(key), lockValue);

boolean success = result != null && result == 1;
if (success) {
log.info("释放锁成功: lockKey={}, lockValue={}", lockKey, lockValue);
} else {
log.warn("释放锁失败: lockKey={}, lockValue={}", lockKey, lockValue);
}

return success;
} catch (Exception e) {
log.error("释放锁异常: lockKey={}, lockValue={}", lockKey, lockValue, e);
return false;
}
}

/**
* 获取锁的剩余时间
* @param lockKey 锁的键
* @return 剩余时间(秒)
*/
public long getLockTtl(String lockKey) {
try {
String key = LOCK_PREFIX + lockKey;
Long ttl = redisTemplate.getExpire(key, TimeUnit.SECONDS);
return ttl != null ? ttl : 0;
} catch (Exception e) {
log.error("获取锁TTL异常: lockKey={}", lockKey, e);
return 0;
}
}

/**
* 检查锁是否存在
* @param lockKey 锁的键
* @return 是否存在
*/
public boolean isLockExists(String lockKey) {
try {
String key = LOCK_PREFIX + lockKey;
return redisTemplate.hasKey(key);
} catch (Exception e) {
log.error("检查锁存在性异常: lockKey={}", lockKey, e);
return false;
}
}
}

2.2 看门狗续约机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/**
* 看门狗续约服务
*/
@Component
public class WatchdogRenewalService {

@Autowired
private RedisDistributedLock redisDistributedLock;

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
private final Map<String, RenewalTask> renewalTasks = new ConcurrentHashMap<>();

/**
* 开始看门狗续约
* @param lockKey 锁的键
* @param lockValue 锁的值
* @param renewalInterval 续约间隔(秒)
* @param expireTime 过期时间(秒)
*/
public void startWatchdogRenewal(String lockKey, String lockValue,
long renewalInterval, long expireTime) {
String taskKey = lockKey + ":" + lockValue;

// 如果已经存在续约任务,先停止
stopWatchdogRenewal(lockKey, lockValue);

// 创建续约任务
RenewalTask renewalTask = new RenewalTask(lockKey, lockValue, renewalInterval, expireTime);
renewalTasks.put(taskKey, renewalTask);

// 启动续约任务
ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(
renewalTask,
renewalInterval,
renewalInterval,
TimeUnit.SECONDS
);

renewalTask.setScheduledFuture(future);

log.info("启动看门狗续约: lockKey={}, lockValue={}, interval={}s, expireTime={}s",
lockKey, lockValue, renewalInterval, expireTime);
}

/**
* 停止看门狗续约
* @param lockKey 锁的键
* @param lockValue 锁的值
*/
public void stopWatchdogRenewal(String lockKey, String lockValue) {
String taskKey = lockKey + ":" + lockValue;
RenewalTask renewalTask = renewalTasks.remove(taskKey);

if (renewalTask != null) {
renewalTask.cancel();
log.info("停止看门狗续约: lockKey={}, lockValue={}", lockKey, lockValue);
}
}

/**
* 续约任务
*/
private class RenewalTask implements Runnable {
private final String lockKey;
private final String lockValue;
private final long renewalInterval;
private final long expireTime;
private volatile boolean cancelled = false;
private ScheduledFuture<?> scheduledFuture;

public RenewalTask(String lockKey, String lockValue, long renewalInterval, long expireTime) {
this.lockKey = lockKey;
this.lockValue = lockValue;
this.renewalInterval = renewalInterval;
this.expireTime = expireTime;
}

@Override
public void run() {
if (cancelled) {
return;
}

try {
// 检查锁是否还存在
if (!redisDistributedLock.isLockExists(lockKey)) {
log.warn("锁已不存在,停止续约: lockKey={}, lockValue={}", lockKey, lockValue);
cancel();
return;
}

// 检查锁的值是否还是当前值
String currentValue = redisDistributedLock.getLockValue(lockKey);
if (!lockValue.equals(currentValue)) {
log.warn("锁的值已变化,停止续约: lockKey={}, expected={}, actual={}",
lockKey, lockValue, currentValue);
cancel();
return;
}

// 续约锁
boolean renewed = redisDistributedLock.renewLock(lockKey, lockValue, expireTime);
if (renewed) {
log.debug("锁续约成功: lockKey={}, lockValue={}, expireTime={}s",
lockKey, lockValue, expireTime);
} else {
log.warn("锁续约失败: lockKey={}, lockValue={}", lockKey, lockValue);
cancel();
}

} catch (Exception e) {
log.error("锁续约异常: lockKey={}, lockValue={}", lockKey, lockValue, e);
cancel();
}
}

public void cancel() {
this.cancelled = true;
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}
}

public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
this.scheduledFuture = scheduledFuture;
}
}

@PreDestroy
public void shutdown() {
// 停止所有续约任务
renewalTasks.values().forEach(RenewalTask::cancel);
renewalTasks.clear();

// 关闭线程池
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(30, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

2.3 增强版Redis分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/**
* 增强版Redis分布式锁
*/
@Component
public class EnhancedRedisDistributedLock {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private WatchdogRenewalService watchdogRenewalService;

private static final String LOCK_PREFIX = "enhanced:lock:";
private static final String RENEWAL_SCRIPT =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";

/**
* 尝试获取锁(带看门狗续约)
* @param lockKey 锁的键
* @param lockValue 锁的值
* @param expireTime 过期时间(秒)
* @param renewalInterval 续约间隔(秒)
* @return 锁信息
*/
public LockInfo tryLockWithWatchdog(String lockKey, String lockValue,
long expireTime, long renewalInterval) {
try {
String key = LOCK_PREFIX + lockKey;
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, lockValue, Duration.ofSeconds(expireTime));

if (success != null && success) {
// 启动看门狗续约
watchdogRenewalService.startWatchdogRenewal(
lockKey, lockValue, renewalInterval, expireTime);

LockInfo lockInfo = new LockInfo(lockKey, lockValue, expireTime, renewalInterval);
log.info("获取锁成功(带看门狗): lockKey={}, lockValue={}, expireTime={}s",
lockKey, lockValue, expireTime);

return lockInfo;
} else {
log.debug("获取锁失败: lockKey={}, lockValue={}", lockKey, lockValue);
return null;
}
} catch (Exception e) {
log.error("获取锁异常: lockKey={}, lockValue={}", lockKey, lockValue, e);
return null;
}
}

/**
* 释放锁(停止看门狗续约)
* @param lockInfo 锁信息
* @return 是否释放成功
*/
public boolean releaseLockWithWatchdog(LockInfo lockInfo) {
if (lockInfo == null) {
return false;
}

try {
// 停止看门狗续约
watchdogRenewalService.stopWatchdogRenewal(
lockInfo.getLockKey(), lockInfo.getLockValue());

// 释放锁
boolean success = releaseLock(lockInfo.getLockKey(), lockInfo.getLockValue());

if (success) {
log.info("释放锁成功(停止看门狗): lockKey={}, lockValue={}",
lockInfo.getLockKey(), lockInfo.getLockValue());
}

return success;
} catch (Exception e) {
log.error("释放锁异常: lockKey={}, lockValue={}",
lockInfo.getLockKey(), lockInfo.getLockValue(), e);
return false;
}
}

/**
* 续约锁
* @param lockKey 锁的键
* @param lockValue 锁的值
* @param expireTime 过期时间(秒)
* @return 是否续约成功
*/
public boolean renewLock(String lockKey, String lockValue, long expireTime) {
try {
String key = LOCK_PREFIX + lockKey;

DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(RENEWAL_SCRIPT);
script.setResultType(Long.class);

Long result = redisTemplate.execute(script,
Collections.singletonList(key), lockValue, String.valueOf(expireTime));

return result != null && result == 1;
} catch (Exception e) {
log.error("续约锁异常: lockKey={}, lockValue={}", lockKey, lockValue, e);
return false;
}
}

/**
* 获取锁的值
* @param lockKey 锁的键
* @return 锁的值
*/
public String getLockValue(String lockKey) {
try {
String key = LOCK_PREFIX + lockKey;
return redisTemplate.opsForValue().get(key);
} catch (Exception e) {
log.error("获取锁值异常: lockKey={}", lockKey, e);
return null;
}
}

/**
* 释放锁(基础方法)
*/
private boolean releaseLock(String lockKey, String lockValue) {
try {
String key = LOCK_PREFIX + lockKey;
String unlockScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> script = new DefaultRedisScript<>();
script.setScriptText(unlockScript);
script.setResultType(Long.class);

Long result = redisTemplate.execute(script,
Collections.singletonList(key), lockValue);

return result != null && result == 1;
} catch (Exception e) {
log.error("释放锁异常: lockKey={}, lockValue={}", lockKey, lockValue, e);
return false;
}
}
}

/**
* 锁信息
*/
@Data
@AllArgsConstructor
public class LockInfo {
private String lockKey;
private String lockValue;
private long expireTime;
private long renewalInterval;
}

3. Zookeeper分布式锁实现

3.1 Zookeeper分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/**
* Zookeeper分布式锁实现
*/
@Component
public class ZookeeperDistributedLock {

private final CuratorFramework curatorFramework;
private final Map<String, InterProcessMutex> locks = new ConcurrentHashMap<>();

public ZookeeperDistributedLock() {
// 初始化Curator客户端
this.curatorFramework = CuratorFrameworkFactory.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();

this.curatorFramework.start();
}

/**
* 尝试获取锁
* @param lockPath 锁的路径
* @param timeout 超时时间(毫秒)
* @return 锁信息
*/
public ZookeeperLockInfo tryLock(String lockPath, long timeout) {
try {
InterProcessMutex mutex = locks.computeIfAbsent(lockPath,
path -> new InterProcessMutex(curatorFramework, "/locks/" + path));

boolean acquired = mutex.acquire(timeout, TimeUnit.MILLISECONDS);

if (acquired) {
ZookeeperLockInfo lockInfo = new ZookeeperLockInfo(lockPath, mutex);
log.info("获取Zookeeper锁成功: lockPath={}, timeout={}ms", lockPath, timeout);
return lockInfo;
} else {
log.debug("获取Zookeeper锁失败: lockPath={}, timeout={}ms", lockPath, timeout);
return null;
}
} catch (Exception e) {
log.error("获取Zookeeper锁异常: lockPath={}, timeout={}ms", lockPath, timeout, e);
return null;
}
}

/**
* 释放锁
* @param lockInfo 锁信息
* @return 是否释放成功
*/
public boolean releaseLock(ZookeeperLockInfo lockInfo) {
if (lockInfo == null || lockInfo.getMutex() == null) {
return false;
}

try {
lockInfo.getMutex().release();
locks.remove(lockInfo.getLockPath());

log.info("释放Zookeeper锁成功: lockPath={}", lockInfo.getLockPath());
return true;
} catch (Exception e) {
log.error("释放Zookeeper锁异常: lockPath={}", lockInfo.getLockPath(), e);
return false;
}
}

/**
* 检查锁是否存在
* @param lockPath 锁的路径
* @return 是否存在
*/
public boolean isLockExists(String lockPath) {
try {
String path = "/locks/" + lockPath;
return curatorFramework.checkExists().forPath(path) != null;
} catch (Exception e) {
log.error("检查Zookeeper锁存在性异常: lockPath={}", lockPath, e);
return false;
}
}

@PreDestroy
public void shutdown() {
try {
// 释放所有锁
locks.values().forEach(mutex -> {
try {
if (mutex.isAcquiredInThisProcess()) {
mutex.release();
}
} catch (Exception e) {
log.error("释放锁异常", e);
}
});
locks.clear();

// 关闭客户端
curatorFramework.close();
} catch (Exception e) {
log.error("关闭Zookeeper客户端异常", e);
}
}
}

/**
* Zookeeper锁信息
*/
@Data
@AllArgsConstructor
public class ZookeeperLockInfo {
private String lockPath;
private InterProcessMutex mutex;
}

4. 分布式锁管理器

4.1 统一锁管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/**
* 统一分布式锁管理器
*/
@Service
public class DistributedLockManager {

@Autowired
private EnhancedRedisDistributedLock redisDistributedLock;

@Autowired
private ZookeeperDistributedLock zookeeperDistributedLock;

/**
* 锁类型枚举
*/
public enum LockType {
REDIS("redis", "Redis分布式锁"),
ZOOKEEPER("zookeeper", "Zookeeper分布式锁");

private final String type;
private final String description;

LockType(String type, String description) {
this.type = type;
this.description = description;
}

public String getType() { return type; }
public String getDescription() { return description; }
}

/**
* 获取分布式锁
* @param lockKey 锁的键
* @param lockType 锁类型
* @param expireTime 过期时间(秒)
* @param renewalInterval 续约间隔(秒)
* @return 锁信息
*/
public DistributedLockInfo acquireLock(String lockKey, LockType lockType,
long expireTime, long renewalInterval) {
String lockValue = generateLockValue();

switch (lockType) {
case REDIS:
LockInfo redisLockInfo = redisDistributedLock.tryLockWithWatchdog(
lockKey, lockValue, expireTime, renewalInterval);
if (redisLockInfo != null) {
return new DistributedLockInfo(lockKey, lockValue, lockType, redisLockInfo);
}
break;

case ZOOKEEPER:
ZookeeperLockInfo zkLockInfo = zookeeperDistributedLock.tryLock(
lockKey, expireTime * 1000);
if (zkLockInfo != null) {
return new DistributedLockInfo(lockKey, lockValue, lockType, zkLockInfo);
}
break;

default:
throw new IllegalArgumentException("不支持的锁类型: " + lockType);
}

return null;
}

/**
* 释放分布式锁
* @param lockInfo 锁信息
* @return 是否释放成功
*/
public boolean releaseLock(DistributedLockInfo lockInfo) {
if (lockInfo == null) {
return false;
}

switch (lockInfo.getLockType()) {
case REDIS:
return redisDistributedLock.releaseLockWithWatchdog(
(LockInfo) lockInfo.getLockData());

case ZOOKEEPER:
return zookeeperDistributedLock.releaseLock(
(ZookeeperLockInfo) lockInfo.getLockData());

default:
log.error("不支持的锁类型: {}", lockInfo.getLockType());
return false;
}
}

/**
* 执行带锁的业务逻辑
* @param lockKey 锁的键
* @param lockType 锁类型
* @param expireTime 过期时间(秒)
* @param renewalInterval 续约间隔(秒)
* @param businessLogic 业务逻辑
* @param <T> 返回值类型
* @return 业务逻辑执行结果
*/
public <T> T executeWithLock(String lockKey, LockType lockType,
long expireTime, long renewalInterval,
Supplier<T> businessLogic) {
DistributedLockInfo lockInfo = acquireLock(lockKey, lockType, expireTime, renewalInterval);

if (lockInfo == null) {
log.warn("获取锁失败: lockKey={}, lockType={}", lockKey, lockType);
return null;
}

try {
log.info("获取锁成功,开始执行业务逻辑: lockKey={}, lockType={}", lockKey, lockType);
return businessLogic.get();
} finally {
boolean released = releaseLock(lockInfo);
if (released) {
log.info("释放锁成功: lockKey={}, lockType={}", lockKey, lockType);
} else {
log.error("释放锁失败: lockKey={}, lockType={}", lockKey, lockType);
}
}
}

/**
* 生成锁值
* @return 锁值
*/
private String generateLockValue() {
return UUID.randomUUID().toString() + ":" + System.currentTimeMillis();
}
}

/**
* 分布式锁信息
*/
@Data
@AllArgsConstructor
public class DistributedLockInfo {
private String lockKey;
private String lockValue;
private DistributedLockManager.LockType lockType;
private Object lockData;
}

5. 实际应用场景

5.1 定时任务防重复执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* 定时任务防重复执行服务
*/
@Service
public class ScheduledTaskService {

@Autowired
private DistributedLockManager distributedLockManager;

/**
* 执行定时任务(防重复)
* @param taskName 任务名称
* @param taskLogic 任务逻辑
*/
public void executeScheduledTask(String taskName, Runnable taskLogic) {
String lockKey = "scheduled:task:" + taskName;

distributedLockManager.executeWithLock(
lockKey,
DistributedLockManager.LockType.REDIS,
300, // 5分钟过期
60, // 1分钟续约
() -> {
log.info("开始执行定时任务: {}", taskName);
try {
taskLogic.run();
log.info("定时任务执行完成: {}", taskName);
} catch (Exception e) {
log.error("定时任务执行失败: {}", taskName, e);
throw e;
}
return null;
}
);
}

/**
* 数据同步任务
*/
@Scheduled(fixedRate = 300000) // 5分钟执行一次
public void dataSyncTask() {
executeScheduledTask("data-sync", () -> {
log.info("开始数据同步任务");

// 模拟数据同步逻辑
try {
Thread.sleep(2000);
log.info("数据同步完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("数据同步被中断", e);
}
});
}

/**
* 缓存清理任务
*/
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
public void cacheCleanupTask() {
executeScheduledTask("cache-cleanup", () -> {
log.info("开始缓存清理任务");

// 模拟缓存清理逻辑
try {
Thread.sleep(3000);
log.info("缓存清理完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("缓存清理被中断", e);
}
});
}
}

5.2 消息重复消费防护

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* 消息重复消费防护服务
*/
@Service
public class MessageDuplicatePreventionService {

@Autowired
private DistributedLockManager distributedLockManager;

@Autowired
private RedisTemplate<String, String> redisTemplate;

private static final String MESSAGE_PREFIX = "message:processed:";

/**
* 处理消息(防重复消费)
* @param messageId 消息ID
* @param messageHandler 消息处理器
* @return 是否处理成功
*/
public boolean processMessage(String messageId, Consumer<String> messageHandler) {
// 1. 检查消息是否已处理
if (isMessageProcessed(messageId)) {
log.info("消息已处理,跳过: messageId={}", messageId);
return true;
}

// 2. 获取分布式锁
String lockKey = "message:lock:" + messageId;

return distributedLockManager.executeWithLock(
lockKey,
DistributedLockManager.LockType.REDIS,
60, // 1分钟过期
20, // 20秒续约
() -> {
// 3. 双重检查消息是否已处理
if (isMessageProcessed(messageId)) {
log.info("消息已处理(双重检查),跳过: messageId={}", messageId);
return true;
}

// 4. 处理消息
try {
log.info("开始处理消息: messageId={}", messageId);
messageHandler.accept(messageId);

// 5. 标记消息已处理
markMessageAsProcessed(messageId);

log.info("消息处理完成: messageId={}", messageId);
return true;

} catch (Exception e) {
log.error("消息处理失败: messageId={}", messageId, e);
throw e;
}
}
) != null;
}

/**
* 检查消息是否已处理
* @param messageId 消息ID
* @return 是否已处理
*/
private boolean isMessageProcessed(String messageId) {
try {
String key = MESSAGE_PREFIX + messageId;
return redisTemplate.hasKey(key);
} catch (Exception e) {
log.error("检查消息处理状态异常: messageId={}", messageId, e);
return false;
}
}

/**
* 标记消息已处理
* @param messageId 消息ID
*/
private void markMessageAsProcessed(String messageId) {
try {
String key = MESSAGE_PREFIX + messageId;
redisTemplate.opsForValue().set(key, "processed", Duration.ofHours(24));
} catch (Exception e) {
log.error("标记消息已处理异常: messageId={}", messageId, e);
}
}
}

5.3 业务逻辑防重复执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/**
* 业务逻辑防重复执行服务
*/
@Service
public class BusinessLogicDuplicatePreventionService {

@Autowired
private DistributedLockManager distributedLockManager;

/**
* 执行订单处理(防重复)
* @param orderId 订单ID
* @param orderProcessor 订单处理器
* @return 处理结果
*/
public OrderProcessResult processOrder(String orderId, Function<String, OrderProcessResult> orderProcessor) {
String lockKey = "order:process:" + orderId;

return distributedLockManager.executeWithLock(
lockKey,
DistributedLockManager.LockType.REDIS,
300, // 5分钟过期
60, // 1分钟续约
() -> {
log.info("开始处理订单: orderId={}", orderId);

try {
OrderProcessResult result = orderProcessor.apply(orderId);
log.info("订单处理完成: orderId={}, result={}", orderId, result);
return result;

} catch (Exception e) {
log.error("订单处理失败: orderId={}", orderId, e);
throw e;
}
}
);
}

/**
* 执行支付处理(防重复)
* @param paymentId 支付ID
* @param paymentProcessor 支付处理器
* @return 处理结果
*/
public PaymentProcessResult processPayment(String paymentId, Function<String, PaymentProcessResult> paymentProcessor) {
String lockKey = "payment:process:" + paymentId;

return distributedLockManager.executeWithLock(
lockKey,
DistributedLockManager.LockType.REDIS,
180, // 3分钟过期
30, // 30秒续约
() -> {
log.info("开始处理支付: paymentId={}", paymentId);

try {
PaymentProcessResult result = paymentProcessor.apply(paymentId);
log.info("支付处理完成: paymentId={}, result={}", paymentId, result);
return result;

} catch (Exception e) {
log.error("支付处理失败: paymentId={}", paymentId, e);
throw e;
}
}
);
}

/**
* 执行库存扣减(防重复)
* @param productId 商品ID
* @param quantity 数量
* @param inventoryProcessor 库存处理器
* @return 处理结果
*/
public InventoryProcessResult processInventory(String productId, int quantity,
BiFunction<String, Integer, InventoryProcessResult> inventoryProcessor) {
String lockKey = "inventory:process:" + productId;

return distributedLockManager.executeWithLock(
lockKey,
DistributedLockManager.LockType.REDIS,
60, // 1分钟过期
20, // 20秒续约
() -> {
log.info("开始处理库存: productId={}, quantity={}", productId, quantity);

try {
InventoryProcessResult result = inventoryProcessor.apply(productId, quantity);
log.info("库存处理完成: productId={}, quantity={}, result={}",
productId, quantity, result);
return result;

} catch (Exception e) {
log.error("库存处理失败: productId={}, quantity={}", productId, quantity, e);
throw e;
}
}
);
}
}

/**
* 订单处理结果
*/
@Data
@AllArgsConstructor
public class OrderProcessResult {
private boolean success;
private String message;
private String orderId;
}

/**
* 支付处理结果
*/
@Data
@AllArgsConstructor
public class PaymentProcessResult {
private boolean success;
private String message;
private String paymentId;
private BigDecimal amount;
}

/**
* 库存处理结果
*/
@Data
@AllArgsConstructor
public class InventoryProcessResult {
private boolean success;
private String message;
private String productId;
private int quantity;
private int remainingStock;
}

6. 监控和告警

6.1 锁监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/**
* 分布式锁监控服务
*/
@Service
public class DistributedLockMonitorService {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private DistributedLockManager distributedLockManager;

private final MeterRegistry meterRegistry;
private final Counter lockAcquisitionCounter;
private final Counter lockReleaseCounter;
private final Timer lockHoldTimeTimer;

public DistributedLockMonitorService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.lockAcquisitionCounter = Counter.builder("distributed.lock.acquisition.total")
.description("分布式锁获取总数")
.register(meterRegistry);
this.lockReleaseCounter = Counter.builder("distributed.lock.release.total")
.description("分布式锁释放总数")
.register(meterRegistry);
this.lockHoldTimeTimer = Timer.builder("distributed.lock.hold.time")
.description("分布式锁持有时间")
.register(meterRegistry);
}

/**
* 获取锁统计信息
* @return 锁统计信息
*/
public LockStatistics getLockStatistics() {
LockStatistics statistics = new LockStatistics();

try {
// 获取Redis锁统计
Set<String> redisLocks = redisTemplate.keys("enhanced:lock:*");
statistics.setRedisLockCount(redisLocks != null ? redisLocks.size() : 0);

// 获取锁获取次数
statistics.setLockAcquisitionCount(lockAcquisitionCounter.count());

// 获取锁释放次数
statistics.setLockReleaseCount(lockReleaseCounter.count());

// 获取平均持有时间
statistics.setAverageHoldTime(lockHoldTimeTimer.mean(TimeUnit.MILLISECONDS));

} catch (Exception e) {
log.error("获取锁统计信息失败", e);
}

return statistics;
}

/**
* 检查锁健康状态
* @return 健康状态报告
*/
public LockHealthReport checkLockHealth() {
LockHealthReport report = new LockHealthReport();

try {
// 检查Redis连接
boolean redisHealthy = checkRedisHealth();
report.setRedisHealthy(redisHealthy);

// 检查锁获取成功率
double acquisitionRate = calculateLockAcquisitionRate();
report.setLockAcquisitionRate(acquisitionRate);

// 检查锁释放成功率
double releaseRate = calculateLockReleaseRate();
report.setLockReleaseRate(releaseRate);

// 检查死锁情况
List<String> deadLocks = detectDeadLocks();
report.setDeadLocks(deadLocks);

// 计算整体健康状态
boolean overallHealthy = redisHealthy && acquisitionRate > 0.9 && releaseRate > 0.9;
report.setOverallHealthy(overallHealthy);

} catch (Exception e) {
log.error("检查锁健康状态失败", e);
report.setOverallHealthy(false);
report.setErrorMessage(e.getMessage());
}

return report;
}

/**
* 检查Redis健康状态
*/
private boolean checkRedisHealth() {
try {
redisTemplate.opsForValue().get("health:check");
return true;
} catch (Exception e) {
log.error("Redis健康检查失败", e);
return false;
}
}

/**
* 计算锁获取成功率
*/
private double calculateLockAcquisitionRate() {
// 这里可以实现更复杂的成功率计算逻辑
return 0.95; // 示例值
}

/**
* 计算锁释放成功率
*/
private double calculateLockReleaseRate() {
// 这里可以实现更复杂的成功率计算逻辑
return 0.98; // 示例值
}

/**
* 检测死锁
*/
private List<String> detectDeadLocks() {
List<String> deadLocks = new ArrayList<>();

try {
Set<String> locks = redisTemplate.keys("enhanced:lock:*");
if (locks != null) {
for (String lock : locks) {
Long ttl = redisTemplate.getExpire(lock, TimeUnit.SECONDS);
if (ttl != null && ttl > 3600) { // 超过1小时的锁认为是死锁
deadLocks.add(lock);
}
}
}
} catch (Exception e) {
log.error("检测死锁失败", e);
}

return deadLocks;
}
}

/**
* 锁统计信息
*/
@Data
public class LockStatistics {
private int redisLockCount;
private double lockAcquisitionCount;
private double lockReleaseCount;
private double averageHoldTime;
private LocalDateTime timestamp;

public LockStatistics() {
this.timestamp = LocalDateTime.now();
}
}

/**
* 锁健康报告
*/
@Data
public class LockHealthReport {
private boolean overallHealthy;
private boolean redisHealthy;
private double lockAcquisitionRate;
private double lockReleaseRate;
private List<String> deadLocks;
private String errorMessage;
private LocalDateTime checkTime;

public LockHealthReport() {
this.checkTime = LocalDateTime.now();
}
}

7. 总结

通过分布式锁、看门狗续约和TTL兜底机制,我们成功构建了一个完整的防重复执行解决方案。关键特性包括:

7.1 核心优势

  1. 防重复执行: 有效防止跨实例重复执行
  2. 自动续约: 看门狗机制自动续约锁
  3. TTL兜底: 过期时间确保锁最终释放
  4. 多锁支持: 支持Redis和Zookeeper分布式锁
  5. 监控告警: 完善的锁监控和健康检查

7.2 最佳实践

  1. 锁粒度控制: 合理控制锁的粒度
  2. 超时设置: 设置合理的锁超时时间
  3. 续约间隔: 设置合适的续约间隔
  4. 异常处理: 完善的异常处理和资源清理
  5. 监控告警: 实时监控锁的状态和性能

这套防重复执行方案不仅能够有效解决分布式环境下的重复执行问题,还为系统的稳定性和可靠性提供了强有力的保障。