Redis实现分布式锁的原理
Redis分布式锁基于其单线程执行命令的特性,通过原子操作实现多节点间的互斥访问。下面从原理、实现、问题及优化四个方面详细解析:
1.原子性与互斥性
Redis分布式锁的核心是原子性操作:
-
获取锁:使用
SET key value NX EX timeout
命令NX
(Not eXists):仅当key不存在时设置成功EX timeout
:设置过期时间,防止死锁- 原子性:Redis单线程执行命令,确保多客户端并发请求时只有一个能成功
-
释放锁:先验证锁持有者再删除
- 必须使用Lua脚本保证原子性,避免误删其他线程的锁
-- 释放锁的Lua脚本
if redis.call('get', KEYS[1]) == ARGV[1] thenreturn redis.call('del', KEYS[1])
elsereturn 0
end
2.分布式锁的实现步骤
1. 获取锁流程:
- 客户端生成唯一标识(如UUID)作为锁的值
- 执行
SET lock_key unique_id NX EX 10
(10秒过期) - 返回
OK
表示获取锁成功,否则失败
2. 释放锁流程:
- 客户端携带锁的唯一标识调用Lua脚本
- 脚本先检查锁的值是否与传入标识一致
- 一致则删除锁,返回1;不一致返回0
示例
1. 添加依赖
在pom.xml
中添加Spring Data Redis依赖:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2. 配置Redis连接
在application.yml
中配置Redis服务器信息:
spring:redis:host: localhostport: 6379password: yourpassword # 如果有密码timeout: 5000mslettuce:pool:max-active: 8max-wait: -1msmax-idle: 8min-idle: 0
3. 创建分布式锁接口
定义锁的基本操作:
public interface RedisLock {/*** 尝试获取锁* @param lockKey 锁的键* @param requestId 请求标识(用于释放锁时校验)* @param expireTime 锁的过期时间* @param timeUnit 时间单位* @return 是否成功获取锁*/boolean tryLock(String lockKey, String requestId, long expireTime, TimeUnit timeUnit);/*** 释放锁* @param lockKey 锁的键* @param requestId 请求标识* @return 是否成功释放锁*/boolean releaseLock(String lockKey, String requestId);
}
4. 实现分布式锁(重点)
使用RedisTemplate
实现锁操作,关键在于:
- 获取锁:使用
setIfAbsent
原子操作 - 释放锁:使用Lua脚本保证原子性
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;import java.util.Collections;
import java.util.concurrent.TimeUnit;@Component
public class RedisLockImpl implements RedisLock {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;// 释放锁的Lua脚本:先验证锁的持有者,再删除锁private static final DefaultRedisScript<Long> RELEASE_LOCK_SCRIPT;static {RELEASE_LOCK_SCRIPT = new DefaultRedisScript<>();RELEASE_LOCK_SCRIPT.setScriptText("if redis.call('get', KEYS[1]) == ARGV[1] then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end");RELEASE_LOCK_SCRIPT.setResultType(Long.class);}@Overridepublic boolean tryLock(String lockKey, String requestId, long expireTime, TimeUnit timeUnit) {// 核心方法:原子性地设置锁和过期时间Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expireTime, timeUnit);return result != null && result;}@Overridepublic boolean releaseLock(String lockKey, String requestId) {// 使用Lua脚本保证原子性Long result = redisTemplate.execute(RELEASE_LOCK_SCRIPT,Collections.singletonList(lockKey),requestId);return result != null && result == 1L;}
}
5. 使用分布式锁
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.UUID;
import java.util.concurrent.TimeUnit;@Service
public class OrderService {@Autowiredprivate RedisLock redisLock;public void createOrder(String orderId) {String lockKey = "order-lock:" + orderId;String requestId = UUID.randomUUID().toString();boolean locked = false;try {// 尝试获取锁,设置过期时间为10秒locked = redisLock.tryLock(lockKey, requestId, 10, TimeUnit.SECONDS);if (locked) {// 获得锁成功,执行关键业务逻辑System.out.println("获取锁成功,开始处理订单: " + orderId);// 模拟业务处理Thread.sleep(2000);} else {// 获得锁失败,处理失败逻辑System.out.println("获取锁失败,稍后重试或执行其他策略");}} catch (Exception e) {e.printStackTrace();} finally {// 无论如何都尝试释放锁,确保不会死锁if (locked) {redisLock.releaseLock(lockKey, requestId);}}}
}
setIfAbsent
方法
RedisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit)
是实现分布式锁的核心方法,它对应Redis的命令:
SET key value NX EX timeout
关键点:
-
原子性:该方法会原子性地完成三个操作:
- 检查key是否存在
- 如果不存在,则设置key的值
- 同时设置key的过期时间
-
防止死锁:
- 必须设置过期时间,确保即使持有锁的进程崩溃,锁也会自动释放
- 过期时间不宜过短(避免业务未完成锁就过期)或过长(影响性能)
-
唯一标识:
- value使用唯一的requestId(如UUID),用于标识锁的持有者
- 释放锁时必须验证requestId,防止误删其他线程的锁
释放锁的原子性问题
释放锁时不能简单地直接删除key,必须先验证锁的持有者:
// 错误示例(非原子操作,有竞态条件)
if (redis.get(lockKey).equals(requestId)) {redis.delete(lockKey);
}// 正确方式:使用Lua脚本保证原子性
Long result = redisTemplate.execute(RELEASE_LOCK_SCRIPT, Collections.singletonList(lockKey), requestId);