Java分布式锁实战指南:从理论到实践
前言
在分布式系统中,传统的单机锁机制无法满足跨进程、跨机器的同步需求。分布式锁应运而生,成为保证分布式系统数据一致性的关键技术。本文将全面介绍Java中分布式锁的实现方式和最佳实践。
1. 分布式锁的核心概念
1.1 为什么需要分布式锁?
在分布式环境中,多个服务实例可能同时访问共享资源,需要一种跨JVM的同步机制:
// 传统单机锁在分布式环境中失效
public class OrderService {private final Object lock = new Object(); // 只在当前JVM有效public void createOrder() {synchronized(lock) {// 在分布式环境中,其他节点的线程仍然可以同时执行}}
}
1.2 分布式锁的基本要求
- 互斥性:同一时刻只有一个客户端能持有锁
- 可重入性:同一个客户端可以多次获取同一把锁
- 超时机制:避免死锁,自动释放过期锁
- 高可用:锁服务需要高可用性
- 高性能:获取和释放锁的操作要高效
2. 基于数据库的分布式锁
2.1 基于唯一索引的实现
// 数据库表结构
CREATE TABLE distributed_lock (id BIGINT PRIMARY KEY AUTO_INCREMENT,lock_key VARCHAR(64) NOT NULL UNIQUE,lock_value VARCHAR(255) NOT NULL,expire_time DATETIME NOT NULL,create_time DATETIME DEFAULT CURRENT_TIMESTAMP
);
// 基于MySQL的分布式锁实现
public class MySQLDistributedLock {private final DataSource dataSource;private final String lockKey;private final String lockValue;public boolean tryLock(long expireMillis) {try (Connection conn = dataSource.getConnection()) {String sql = "INSERT INTO distributed_lock (lock_key, lock_value, expire_time) " +"VALUES (?, ?, DATE_ADD(NOW(), INTERVAL ? MILLISECOND)) " +"ON DUPLICATE KEY UPDATE " +"lock_value = IF(expire_time < NOW(), VALUES(lock_value), lock_value), " +"expire_time = IF(expire_time < NOW(), VALUES(expire_time), expire_time)";PreparedStatement ps = conn.prepareStatement(sql);ps.setString(1, lockKey);ps.setString(2, lockValue);ps.setLong(3, expireMillis);return ps.executeUpdate() > 0;} catch (SQLException e) {return false;}}public void unlock() {try (Connection conn = dataSource.getConnection()) {String sql = "DELETE FROM distributed_lock WHERE lock_key = ? AND lock_value = ?";PreparedStatement ps = conn.prepareStatement(sql);ps.setString(1, lockKey);ps.setString(2, lockValue);ps.executeUpdate();} catch (SQLException e) {// 日志记录}}
}
2.2 优缺点分析
优点:
- 实现简单,依赖少
- 理解容易,适合小型项目
缺点:
- 性能瓶颈,数据库压力大
- 非阻塞操作实现复杂
- 需要处理数据库连接问题
3. 基于Redis的分布式锁
3.1 使用Redisson客户端
<!-- Maven依赖 -->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.17.0</version>
</dependency>
// Redisson分布式锁示例
public class RedisDistributedLockExample {private final RedissonClient redisson;public void performTask() {RLock lock = redisson.getLock("myDistributedLock");try {// 尝试获取锁,最多等待10秒,锁过期时间30秒boolean isLocked = lock.tryLock(10, 30, TimeUnit.SECONDS);if (isLocked) {// 执行业务逻辑executeBusinessLogic();}} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {if (lock.isHeldByCurrentThread()) {lock.unlock();}}}private void executeBusinessLogic() {// 业务代码}
}
3.2 手动实现Redis分布式锁
public class ManualRedisLock {private final JedisPool jedisPool;private static final String LOCK_SCRIPT = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then " +"return redis.call('pexpire', KEYS[1], ARGV[2]) " +"else return 0 end";private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else return 0 end";public boolean tryLock(String lockKey, String lockValue, long expireMillis) {try (Jedis jedis = jedisPool.getResource()) {Object result = jedis.eval(LOCK_SCRIPT, Collections.singletonList(lockKey),Arrays.asList(lockValue, String.valueOf(expireMillis)));return "1".equals(result.toString());}}public boolean unlock(String lockKey, String lockValue) {try (Jedis jedis = jedisPool.getResource()) {Object result = jedis.eval(UNLOCK_SCRIPT,Collections.singletonList(lockKey),Collections.singletonList(lockValue));return "1".equals(result.toString());}}
}
4. 基于ZooKeeper的分布式锁
4.1 Curator框架实现
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.3.0</version>
</dependency>
public class ZookeeperDistributedLock {private final CuratorFramework client;private final String lockPath;public void executeWithLock() {InterProcessMutex lock = new InterProcessMutex(client, lockPath);try {if (lock.acquire(10, TimeUnit.SECONDS)) {try {// 获得锁后的业务逻辑processBusiness();} finally {lock.release();}}} catch (Exception e) {// 处理异常}}private void processBusiness() {// 业务处理}
}
4.2 ZooKeeper锁原理
ZooKeeper通过临时顺序节点实现分布式锁:
- 客户端在锁目录下创建临时顺序节点
- 检查自己是否是最小序号的节点
- 如果是,获得锁;如果不是,监听前一个节点
- 完成操作后删除节点
5. Spring Boot整合分布式锁
5.1 基于Spring的分布式锁抽象
@Component
public class DistributedLockManager {@Autowiredprivate RedissonClient redissonClient;public <T> T executeWithLock(String lockKey, long waitTime, long leaseTime, Supplier<T> supplier) {RLock lock = redissonClient.getLock(lockKey);try {if (lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {return supplier.get();}throw new RuntimeException("获取锁失败");} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("锁获取被中断", e);} finally {if (lock.isHeldByCurrentThread()) {lock.unlock();}}}
}
5.2 注解方式使用分布式锁
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface DistributedLock {String key(); // 锁的keylong waitTime() default 5000; // 等待时间long leaseTime() default 30000; // 持有时间
}
@Aspect
@Component
public class DistributedLockAspect {@Autowiredprivate DistributedLockManager lockManager;@Around("@annotation(distributedLock)")public Object around(ProceedingJoinPoint joinPoint, DistributedLock distributedLock) throws Throwable {String lockKey = distributedLock.key();return lockManager.executeWithLock(lockKey, distributedLock.waitTime(),distributedLock.leaseTime(),() -> {try {return joinPoint.proceed();} catch (Throwable throwable) {throw new RuntimeException(throwable);}});}
}
6. 分布式锁的最佳实践
6.1 锁key的设计原则
public class LockKeyGenerator {public static String generateLockKey(String prefix, String businessKey) {return String.format("lock:%s:%s", prefix, businessKey);}public static String generateOrderLockKey(Long orderId) {return generateLockKey("order", String.valueOf(orderId));}
}
6.2 异常处理和重试机制
public class LockRetryTemplate {private final int maxRetries;private final long retryInterval;public <T> T executeWithRetry(Callable<T> task, String lockKey) {int retries = 0;while (retries < maxRetries) {try {return task.call();} catch (LockAcquisitionException e) {retries++;if (retries >= maxRetries) {throw new RuntimeException("获取锁重试次数超限", e);}try {Thread.sleep(retryInterval);} catch (InterruptedException ie) {Thread.currentThread().interrupt();throw new RuntimeException("重试被中断", ie);}} catch (Exception e) {throw new RuntimeException("业务执行异常", e);}}throw new RuntimeException("未知异常");}
}
6.3 监控和告警
@Component
public class LockMonitor {private final MeterRegistry meterRegistry;@EventListenerpublic void onLockEvent(LockEvent event) {meterRegistry.counter("distributed.lock.operation", "type", event.getType().name(),"success", String.valueOf(event.isSuccess())).increment();if (!event.isSuccess()) {// 发送告警sendAlert(event);}}private void sendAlert(LockEvent event) {// 实现告警逻辑}
}
7. 不同场景下的选择建议
7.1 技术选型对比
方案 | 性能 | 可靠性 | 实现复杂度 | 适用场景 |
---|---|---|---|---|
数据库锁 | 低 | 高 | 低 | 低频操作,数据一致性要求高 |
Redis锁 | 高 | 中 | 中 | 高频操作,允许偶尔失败 |
ZooKeeper锁 | 中 | 高 | 高 | 强一致性要求,复杂锁场景 |
7.2 推荐方案
- 一般业务场景:Redis + Redisson
- 金融级一致性:ZooKeeper + Curator
- 简单低频场景:数据库实现
- 云原生环境:使用云服务商提供的分布式锁服务
8. 常见问题及解决方案
8.1 锁过期时间设置
// 动态调整锁超时时间
public class AdaptiveLockTimeout {private long baseTimeout = 30000; // 基础超时30秒private long maxTimeout = 120000; // 最大超时2分钟public long calculateTimeout(String businessType) {// 根据业务类型和历史执行时间动态计算超时long estimatedTime = estimateExecutionTime(businessType);return Math.min(baseTimeout + estimatedTime * 2, maxTimeout);}
}
8.2 避免死锁
// 锁超时自动释放
public class SafeDistributedLock {public boolean tryLockWithTimeout(String lockKey, long timeout) {long start = System.currentTimeMillis();while (System.currentTimeMillis() - start < timeout) {if (tryAcquireLock(lockKey)) {return true;}try {Thread.sleep(100); // 短暂等待} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}return false;}
}
总结
分布式锁是分布式系统中的重要组件,选择合适的技术方案需要综合考虑性能、可靠性、复杂度等因素。建议:
- 优先使用成熟框架如Redisson或Curator
- 合理设计锁粒度,避免过度使用分布式锁
- 实现完善的监控,及时发现和处理锁问题
- 考虑最终一致性方案,减少对分布式锁的依赖