如果由于流量大而在短时间内几乎同时发出请求,或者由于服务器不稳定而需要很长时间来处理请求,并发问题可能会导致数据完整性问题。
示例问题情况
让我们假设有一个逻辑可以检索产品的库存并将库存减少一个,如上所述。此时,两个请求大致同时到达,以检索和减少库存。这里出现的问题是其他请求在完成减库存任务之前也可以查询库存,所以虽然有一个库存,但是两个请求都处理了,导致库存为零的意外结果。
因此,为了稳定地访问和控制共享资源,需要使用锁来限制在特定代码块中只能处理一个请求。
1.进程中的锁控制
各种编程语言通常会提供线程安全设施,以防止多个线程访问一个进程。在 Java 中,您可以通过向方法添加 synchronized 关键字或在代码中添加 synchronized 块来控制并发性。
代码示例:
@Synchronized
fun decrease(id: Long, quantity: Long) {
val stock: Stock = stockRepository.findByIdOrNull(id)
?: throw IllegalArgumentException("无法找到库存信息.")
stock.decrease(quantity);
}
坏处:
它在应用程序服务器不是由一个组成的多服务器环境中不能很好地工作。由于上述方法只能控制一个进程内的并发,所以在配置多个进程时也会出现同样的并发问题。
2.DB中的锁控制
有一种方法可以直接控制数据库中的锁,而不是在应用程序级别。
2.1. 悲观锁
这是在请求资源时会频繁出现并发问题的悲观预期下设置锁的方法。当一个事务正在访问数据时,禁止其他事务进行读写。许多 RBDMS(MySQL、Oracle、PostgreSQL 等)SELECT ~ FOR UPDATE
提供了一个功能,可以防止其他事务访问该行,直到更新完成并用 SQL 语句提交。
代码示例:
interface StockRepository : JpaRepository<Stock, Long> {
@Lock(LockModeType.PESSIMISTIC_WRITE)
fun findStockById(id: Long): Stock?
}
如上面的例子,如果你使用 JPA,@Lock(LockModeType.PESSIMISTIC_WRITE)
你可以指定悲观锁。
-
LockModeType.PESSIMISTIC_WRITE: SELECT ~ FOR UPDATE query is executed (lock both read and write with exclusive lock)
-
LockModeType.PESSIMISTIC_READ: SELECT ~ FOR SHARE query is executed (write lock with shared lock)
坏处:
多表连接时存在死锁风险。
此外,由于频繁锁定会降低吞吐量,因此如果并发问题很少发生,则会降低性能。
2.2 乐观锁
乐观锁定是一种通过添加诸如版本之类的列来调整数据一致性的方法,而不是使用实际锁定。它乐观地判断并发问题不会频繁发生,所以所有的请求都在不加锁的情况下处理,只有当发现数据一致性问题时,才进行回滚以保证一致性。
发生并发问题时,如果现有的版本值已经在一个事务中被更新,则在另一个事务中无法检索到相应的版本值,因此不会发生更新。如果没有更新,则确定有问题,进行回滚处理,保证数据的一致性。
坏处:
如果并发问题频繁发生,则可能会频繁回滚。
3.使用Redis控制锁
使用 Redis,即使在具有多个应用程序的多服务器环境中,也可以有效地使用锁。
3.1. 使用 SETNX 命令的自旋锁
Redis是一种在键不存在时使用SETNX
“ SET if N ot e Xists ”命令设置值的方法。这允许您实现一个自旋锁,将特定键设置为锁,并在锁已被使用时定期请求获取锁。
在上面的示例中SET stock-id "lock" NX EX 3
,如果 stock-id 键不存在,则设置一个 3 秒后过期的“锁定”值。
应用会不断发出请求,直到获取到stock-id key,用到时删除key。
(SETNX 命令已弃用,因此建议使用 SET 命令和 NX 选项。)
代码示例:
@Component
class RedisLockRepository(
private val redisTemplate: RedisTemplate<String, String>,
) {
fun lock(key: Long): Boolean {
val isSuccess: Boolean? = redisTemplate.opsForValue()
.setIfAbsent(key.toString(), "lock", Duration.ofSeconds(3L))
return isSuccess ?: false
}
fun unlock(key: Long): Boolean {
return redisTemplate.delete(key.toString())
}
}
@Service
class RedisLockStockService(
private val redisLockRepository: RedisLockRepository,
) {
fun decrease(id: Long, quantity: Long) {
while (redisLockRepository.lock(id).not()) {
TimeUnit.MILLISECONDS.sleep(100L) // 100 milliseconds sleep
}
try {
...
} finally {
redisLockRepository.unlock(id)
}
}
}
坏处:
实现很简单,但是它给 Redis 服务器增加了负载,因为它会不断尝试请求直到获得锁。
3.2. 使用Redisson的分布式锁
Redisson是一个帮助 Redis 高效处理分布式锁的开源软件。它提供了一个功能,可以通过使用 pub/sub 和 Lua 脚本来有效地处理分布式锁。
发布/订阅功能
Redisson 使用 Redis 的发布/订阅功能来等待订阅频道上的消息,直到获得锁。然后,当发生解锁并发布频道消息时,尝试获取锁。
使用 Lua 脚本执行原子命令
Redis 可以使用 Lua 脚本原子地执行一组命令,Redisson 也利用它来原子地执行一组锁定和解锁命令。
上面的例子是Redisson的解锁流程的一段代码。
1. 检索具有指定散列键和递减 1 的锁。
2. 如果计数器结果为0,则删除key。
3.使用publish命令发送频道的消息。
当使用上述函数解锁时,通过订阅从另一个线程接收消息,并且可以尝试获取锁。
@Service
class RedissonLockStockFacade(
private val stockService: StockService,
private val redissonClient: RedissonClient,
) {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
fun decrease(id: Long, quantity: Long) {
val lock: RLock = redissonClient.getLock(id.toString())
try {
val available: Boolean = lock.tryLock(5L, 1L, TimeUnit.SECONDS)
if (available.not()) {
log.error("lock ")
return
}
stockService.decrease(id, quantity)
} finally {
lock.unlock()
}
}
}