🎯 本文介绍了一种优化预订接口的方法,通过引入本地消息表解决分布式事务中的最终一致性问题。原先的实现是在一个事务中同时扣减库存和创建订单,容易因网络不稳定导致数据不一致。改进后的方法将业务操作和消息发送封装在本地事务中,并利用MQ进行异步解耦,确保了即使在网络故障时也能保证系统的数据一致性。此外,还设计了定时任务重试机制以及幂等性保障措施来进一步确保消息被成功处理,从而实现了高效可靠的分布式事务处理。

说明

在前面的预订实现中,是先开启一个事务,然后去扣减库存,再通过RPC调用订单服务来创建订单,如果订单创建成功,就提交事务;否则回滚事务。代码实现如下:

/*** 执行下单和数据库库存扣减操作** @param timePeriodDO* @param courtIndex* @param venueId* @return*/
@Override
public OrderDO executePreserveV1(TimePeriodDO timePeriodDO,Long courtIndex, Long venueId,String stockKey, String freeIndexBitMapKey) {// 编程式开启事务,减少事务粒度,避免长事务的发生return transactionTemplate.execute(status -> {try {// 扣减当前时间段的库存,修改空闲场信息baseMapper.updateStockAndBookedSlots(timePeriodDO.getId(), timePeriodDO.getPartitionId(), courtIndex);// 调用远程服务创建订单OrderGenerateReqDTO orderGenerateReqDTO = OrderGenerateReqDTO.builder().timePeriodId(timePeriodDO.getId()).partitionId(timePeriodDO.getPartitionId()).periodDate(timePeriodDO.getPeriodDate()).beginTime(timePeriodDO.getBeginTime()).endTime(timePeriodDO.getEndTime()).courtIndex(courtIndex).userId(UserContext.getUserId()).userName(UserContext.getUsername()).venueId(venueId).payAmount(timePeriodDO.getPrice()).build();Result<OrderDO> result;try {result = orderFeignService.generateOrder(orderGenerateReqDTO);if (result == null || !result.isSuccess()) {// --if-- 订单生成失败,抛出异常,上面的库存扣减也会回退throw new ServiceException(BaseErrorCode.ORDER_GENERATE_ERROR);}} catch (Exception e) {// --if-- 订单生成服务调用失败// 恢复缓存中的信息this.restoreStockAndBookedSlotsCache(timePeriodDO.getId(),UserContext.getUserId(),courtIndex,stockKey,freeIndexBitMapKey);// todo 如果说由于网络原因,实际上订单已经创建成功了,但是因为超时访问失败,这里库存却回滚了,此时需要将订单置为废弃状态(即删除)// 发送一个短暂的延时消息(时间过长,用户可能已经支付),去检查订单是否生成,如果生成,将其删除// 打印错误堆栈信息e.printStackTrace();// 把错误返回到前端throw new ServiceException(e.getMessage());}return result.getData();} catch (Exception ex) {status.setRollbackOnly();throw ex;}});
}

但是网络有时候是不稳定的,假如订单服务创建订单成功,但是由于网络原因,没办法将订单数据返回给库存服务。这时候库存服务就会误认为订单服务出错,进而回滚了事务。这样,就出现了订单创建成功,但是库存却没有扣减,出现了不一致问题,这种不一致会导致超卖。

由于库存扣减、订单生成处于不同的服务中,双方无法使用本地事务来保证两者的一致性,这属于分布式事务。常见的分布式事务解决方案有:

  • 强一致:2PC、3PC、TCC、Saga模式
  • 最终一致:本地消息表、MQ事务消息、最大努力通知
  • 工具:Seata

本文使用比较常用的本地消息表来解决

本地消息表介绍

本地消息表的核心思想:将分布式事务拆分为本地事务+异步消息,通过本地事务保证消息的可靠存储,通过重试机制确保远程业务最终执行成功。

核心步骤

  1. 本地事务与消息写入 业务执行时,先在本地数据库完成业务操作,同时将待发送的消息(含业务ID、状态等)插入同一事务的消息表,利用本地事务的ACID特性保证两者原子性。
  2. 异步轮询消息 后台定时任务扫描消息表中状态为"待发送"的消息,调用下游服务的接口。
  3. 下游服务处理 下游服务执行业务逻辑,成功后返回确认;若失败或超时,触发重试(需保证接口幂等性)。
  4. 消息状态更新 下游处理成功后,更新本地消息表中该消息状态为"已完成";若多次重试失败则标记为"失败",人工介入处理。

关键点

  • 可靠性:消息表与业务数据同库,本地事务确保业务执行成功,本地消息就会记录成功
  • 异步解耦:通过异步重试替代同步阻塞,提高系统吞吐量
  • 幂等性:下游服务调用要支持幂等性,不然重复消费可能出问题

本文实践过程

  • 预订接口首先通过缓存验证用户是否预订成功,预订成功就发送一条预订消息到MQ
  • 订单服务去消费预订消息,通过本地事务保证插入订单、插入本地消息的原子性
  • 通过定时任务轮询本地消息表中还没有执行成功的消息,将任务投递到MQ中,后面让库存服务去消费,进行库存扣减(当然这里也可以直接通过RPC调用库存服务扣减,但是为了解耦两个服务,本文使用MQ来实现)
  • 注意:库存服务执行库存扣减的时候,需要保证幂等性。即一个订单扣减过库存之后,不允许再扣减第二次。

数据库设计

首先需要创建一个表,用来记录本地消息

CREATE TABLE `local_message` (`id` bigint NOT NULL COMMENT '主键ID',`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',`is_deleted` tinyint NOT NULL DEFAULT '0' COMMENT '逻辑删除 0:未删除 1:已删除',`msg_id` varchar(64) NOT NULL COMMENT '唯一消息ID',`topic` varchar(200) NOT NULL COMMENT '消息Topic',`tag` varchar(200) NOT NULL DEFAULT '' COMMENT '消息Tag',`content` text NOT NULL COMMENT '消息内容(JSON格式)',`status` tinyint NOT NULL DEFAULT '0' COMMENT '消息状态 0:待发送 1:消费失败 2:消费成功 3:超过重试次数',`fail_reason` varchar(1000) DEFAULT NULL COMMENT '失败原因',`retry_count` int NOT NULL DEFAULT '0' COMMENT '已重试次数',`next_retry_time` bigint NOT NULL DEFAULT '0' COMMENT '下次重试时间戳(毫秒)',`max_retry_count` int NOT NULL DEFAULT '3' COMMENT '最大重试次数',PRIMARY KEY (`id`),UNIQUE KEY `uk_msg_id` (`msg_id`),KEY `idx_status_retry` (`status`, `next_retry_time`),KEY `idx_topic_tag` (`topic`, `tag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='本地消息表';

业务实现

枚举

package com.vrs.enums;import lombok.Getter;
import lombok.RequiredArgsConstructor;/*** 场馆类型枚举*/
@RequiredArgsConstructor
public enum LocalMessageStatusEnum {INIT(0, "待发送"),SEND_FAIL(1, "消费失败"),SEND_SUCCESS(2, "消费成功"),ARRIVE_MAX_RETRY_COUNT(3, "超过重试次数"),;@Getterprivate final int status;@Getterprivate final String msg;}

预订

首先验证令牌是否充足,充足就发送一条预订消息到 MQ

/*** 尝试获取令牌,令牌获取成功之后,发送消息,异步执行库存扣减和订单生成* 注意:令牌在极端情况下,如扣减令牌之后,服务宕机了,此时令牌的库存是小于真实库存的* 如果查询令牌发现库存为0,尝试去数据库中加载数据,加载之后库存还是0,说明时间段确实售罄了* 使用消息队列异步 扣减库存,更新缓存,生成订单** @param timePeriodId* @param courtIndex*/
@Override
public String reserve2(Long timePeriodId, Integer courtIndex) { 参数校验:使用责任链模式校验数据是否正确TimePeriodReserveReqDTO timePeriodReserveReqDTO = new TimePeriodReserveReqDTO(timePeriodId, courtIndex);chainContext.handler(ChainConstant.RESERVE_CHAIN_NAME, timePeriodReserveReqDTO);Long venueId = timePeriodReserveReqDTO.getVenueId();VenueDO venueDO = timePeriodReserveReqDTO.getVenueDO();PartitionDO partitionDO = timePeriodReserveReqDTO.getPartitionDO();TimePeriodDO timePeriodDO = timePeriodReserveReqDTO.getTimePeriodDO(); 使用lua脚本获取一个空场地对应的索引,并扣除相应的库存,同时在里面进行用户的查重// 首先检测空闲场号缓存有没有加载好,没有的话进行加载this.checkBitMapCache(String.format(RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY, timePeriodReserveReqDTO.getTimePeriodId()),timePeriodId,partitionDO.getNum());// 其次检测时间段库存有没有加载好,没有的话进行加载this.getStockByTimePeriodId(RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY, timePeriodReserveReqDTO.getTimePeriodId());// todo 判断是否还有令牌,没有的话,重新加载(注意要分布式锁)// 执行lua脚本Long freeCourtIndex = executeStockReduceByLua(timePeriodReserveReqDTO,venueDO,courtIndex, RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY,RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY);if (freeCourtIndex == -2L) {// --if-- 用户已经购买过该时间段throw new ClientException(BaseErrorCode.TIME_PERIOD_HAVE_BOUGHT_ERROR);} else if (freeCourtIndex == -1L) {// --if-- 没有空闲的场号,查询数据库,如果数据库中有库存,删除缓存,下一个用户预定时重新加载令牌this.refreshTokenByCheckDataBase(timePeriodId);throw new ServiceException(BaseErrorCode.TIME_PERIOD_SELL_OUT_ERROR);} 发送消息,异步更新库存并生成订单String orderSn = SnowflakeIdUtil.nextId() + String.valueOf(UserContext.getUserId() % 1000000);SendResult sendResult = executeReserveProducer.sendMessage(ExecuteReserveMqDTO.builder().orderSn(orderSn).timePeriodId(timePeriodId).courtIndex(freeCourtIndex).venueId(venueId).userId(UserContext.getUserId()).userName(UserContext.getUsername()).partitionId(partitionDO.getId()).price(timePeriodDO.getPrice()).periodDate(timePeriodDO.getPeriodDate()).beginTime(timePeriodDO.getBeginTime()).endTime(timePeriodDO.getEndTime()).build());if (!sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {log.error("消息发送失败: " + sendResult.getSendStatus());// 恢复令牌缓存this.restoreStockAndBookedSlotsCache(timePeriodId,UserContext.getUserId(),freeCourtIndex,RedisCacheConstant.VENUE_TIME_PERIOD_STOCK_TOKEN_KEY,RedisCacheConstant.VENUE_TIME_PERIOD_FREE_INDEX_BIT_MAP_TOKEN_KEY);throw new ServiceException(BaseErrorCode.MQ_SEND_ERROR);}return orderSn;
}

订单生成

消息预订消息,执行订单创建,并插入本地事务

/*** 消费预订之后的消息* 生成订单、生成本地消息** @param message*/
@Override
public void generateOrder(ExecuteReserveMqDTO message) {OrderDO orderDO = OrderDO.builder()// 订单号使用雪花算法生成分布式ID,然后再拼接用户ID的后面六位.orderSn(message.getOrderSn()).orderTime(new Date()).venueId(message.getVenueId()).partitionId(message.getPartitionId()).courtIndex(message.getCourtIndex()).timePeriodId(message.getTimePeriodId()).periodDate(message.getPeriodDate()).beginTime(message.getBeginTime()).endTime(message.getEndTime()).userId(message.getUserId()).userName(message.getUserName()).payAmount(message.getPrice()).orderStatus(OrderStatusConstant.UN_PAID).build();TimePeriodStockReduceMqDTO timePeriodStockReduceMqDTO = TimePeriodStockReduceMqDTO.builder().orderSn(message.getOrderSn()).timePeriodId(message.getTimePeriodId()).partitionId(message.getPartitionId()).courtIndex(message.getCourtIndex()).build();LocalMessageDO stockReduceLocalMessageDO = LocalMessageDO.builder().msgId(message.getOrderSn()).topic(RocketMqConstant.VENUE_TOPIC).tag(RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG).content(JSON.toJSONString(timePeriodStockReduceMqDTO)).nextRetryTime(System.currentTimeMillis()).maxRetryCount(5).build();LocalMessageDO delayCloseLocalMessageD0 = LocalMessageDO.builder().msgId(SnowflakeIdUtil.nextIdStr()).topic(RocketMqConstant.ORDER_TOPIC).tag(RocketMqConstant.ORDER_DELAY_CLOSE_TAG).content(JSON.toJSONString(OrderDelayCloseMqDTO.builder().orderSn(orderDO.getOrderSn()).build())).nextRetryTime(System.currentTimeMillis()).maxRetryCount(5).build();// 使用编程式事务,保证订单创建、本地消息插入的一致性boolean success = transactionTemplate.execute(status -> {try {int insertCount = baseMapper.insert(orderDO);localMessageService.save(stockReduceLocalMessageDO);// 也保存一个本地消息,进行兜底。防止事务提交成功之后就宕机,延时消息没有发生成功localMessageService.save(delayCloseLocalMessageD0);return insertCount > 0;} catch (Exception ex) {status.setRollbackOnly();throw ex;}});if (success) {// 发送延时消息来关闭未支付的订单SendResult sendResult = orderDelayCloseProducer.sendMessage(OrderDelayCloseMqDTO.builder().orderSn(orderDO.getOrderSn()).build());if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {// 延迟关单已经发生成功,后面扫描的时候,无需再处理LocalMessageDO localMessageDO = new LocalMessageDO();localMessageDO.setId(delayCloseLocalMessageD0.getId());localMessageDO.setStatus(LocalMessageStatusEnum.INIT.getStatus());localMessageService.updateById(localMessageDO);}// todo 如果出现宕机,可能出现宕机,但是 websocket 消息没有消息,所以前端还要实现一个轮询来保底// 通过 websocket 发送消息,通知前端websocketSendMessageProducer.sendMessage(WebsocketMqDTO.builder().toUsername(orderDO.getUserName()).message(JSON.toJSONString(orderDO)).build());}
}

定时任务

  • 定期扫描本地消息表(local_message)中待处理(未处理、上次处理失败、下次重试时间小于等于现在)的消息
  • 根据消息 Topic 和 tag 调用不同的消息处理器,将本地消息投递到消息队列中
  • 消息投递成功后更新消息状态,失败则通过指数退避算法计算下次重试时间,等待下次重试
  • 使用分布式锁保证集群环境下只有一个实例执行任务

【性能优化】

  • 使用流式查询,避免分页查询的无效扫描
  • 通过批量修改优化单条修改的效率

【策略模式】

  • 通过策略模式,根据不同的 tag 获得不同的 MQ 生产者,避免if else代码
package com.vrs.service.scheduled;import com.alibaba.fastjson2.JSON;
import com.vrs.constant.RocketMqConstant;
import com.vrs.design_pattern.strategy.MessageProcessor;
import com.vrs.domain.dto.mq.OrderDelayCloseMqDTO;
import com.vrs.domain.dto.mq.TimePeriodStockReduceMqDTO;
import com.vrs.domain.entity.LocalMessageDO;
import com.vrs.enums.LocalMessageStatusEnum;
import com.vrs.rocketMq.producer.OrderDelayCloseProducer;
import com.vrs.rocketMq.producer.TimePeriodStockReduceProducer;
import com.vrs.service.LocalMessageService;
import jakarta.annotation.PostConstruct;
import lombok.Cleanup;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;/*** @Author dam* @create 2024/11/17 16:44*/
@Component
@RequiredArgsConstructor
@Slf4j
public class LocalMessageScheduledScan {private final DataSource dataSource;private final LocalMessageService localMessageService;private final TimePeriodStockReduceProducer timePeriodStockReduceProducer;private final OrderDelayCloseProducer orderDelayCloseProducer;private final RedissonClient redissonClient;/*** 使用策略模式处理消息*/// todo 可以优化策略模式的写法,方便代码扩展private final Map<String, MessageProcessor> messageProcessors = new HashMap<>();private final int BATCH_SIZE = 1000;/*** 注册 tag 和其对应的消息处理器*/@PostConstructpublic void init() {messageProcessors.put(RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG, mqDTO -> {TimePeriodStockReduceMqDTO dto = JSON.parseObject(mqDTO.getContent(), TimePeriodStockReduceMqDTO.class);return timePeriodStockReduceProducer.sendMessage(dto);});messageProcessors.put(RocketMqConstant.ORDER_DELAY_CLOSE_TAG, mqDTO -> {OrderDelayCloseMqDTO dto = JSON.parseObject(mqDTO.getContent(), OrderDelayCloseMqDTO.class);return orderDelayCloseProducer.sendMessage(dto);});}/*** 定时任务:扫描并处理本地消息* 每分钟执行一次*/@Scheduled(cron = "0 */1 * * * ?")@SneakyThrowspublic void processLocalMessage() {RLock lock = redissonClient.getLock("LocalMessageScan");boolean locked = false;try {locked = lock.tryLock(1, TimeUnit.MINUTES);if (!locked) {log.warn("获取分布式锁失败,跳过本次处理");return;}log.info("开始扫描本地消息表...");long start = System.currentTimeMillis();@Cleanup Connection conn = dataSource.getConnection();@Cleanup Statement stmt = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);stmt.setFetchSize(Integer.MIN_VALUE);// 查询sql,只查询关键的字段String sql = "SELECT id,msg_id,topic,tag,content,retry_count,max_retry_count,next_retry_time FROM local_message where " +"is_deleted = 0 and (status = 0 OR status = 1) and next_retry_time<" + start;@Cleanup ResultSet rs = stmt.executeQuery(sql);List<LocalMessageDO> localMessageBuffer = new ArrayList<>();while (rs.next()) {// 获取数据中的属性LocalMessageDO localMessageDO = new LocalMessageDO();localMessageDO.setId(rs.getLong("id"));localMessageDO.setMsgId(rs.getString("msg_id"));localMessageDO.setTopic(rs.getString("topic"));localMessageDO.setTag(rs.getString("tag"));localMessageDO.setContent(rs.getString("content"));localMessageDO.setRetryCount(rs.getInt("retry_count"));localMessageDO.setMaxRetryCount(rs.getInt("max_retry_count"));localMessageDO.setNextRetryTime(rs.getLong("next_retry_time"));if (localMessageDO.getRetryCount() > localMessageDO.getMaxRetryCount()) continue;localMessageBuffer.add(localMessageDO);if (localMessageBuffer.size() > BATCH_SIZE) {batchProcessMessages(localMessageBuffer);localMessageBuffer.clear();}}if (!localMessageBuffer.isEmpty()) {batchProcessMessages(localMessageBuffer);}log.info("结束扫描本地消息表..." + (System.currentTimeMillis() - start) + "ms");} catch (Exception e) {log.error("处理本地消息表时发生异常", e);throw e; // 或根据业务决定是否抛出} finally {if (locked && lock.isHeldByCurrentThread()) {lock.unlock();}}}/*** 批量处理消息*/private void batchProcessMessages(List<LocalMessageDO> messages) {// 成功和失败的消息分开处理List<Long> successIds = new ArrayList<>();List<Long> retryIds = new ArrayList<>();List<Long> arriveMaxRetryCountIds = new ArrayList<>();Map<Long, String> failureReasons = new HashMap<>();for (LocalMessageDO message : messages) {try {if (message.getRetryCount() > message.getMaxRetryCount()) {// 已经到达最大重试次数arriveMaxRetryCountIds.add(message.getId());continue;}MessageProcessor processor = messageProcessors.get(message.getTag());SendResult sendResult = processor.process(message);if (sendResult.getSendStatus().equals(SendStatus.SEND_OK)) {successIds.add(message.getId());} else {retryIds.add(message.getId());failureReasons.put(message.getId(), "MQ发送状态: " + sendResult.getSendStatus());}} catch (Exception e) {log.error("处理消息 {} 时发生异常", message.getMsgId(), e);retryIds.add(message.getId());failureReasons.put(message.getId(), "处理异常: " + e.getMessage());}}// 批量更新状态if (!successIds.isEmpty()) {batchUpdateMessagesStatus(successIds, LocalMessageStatusEnum.SEND_SUCCESS);}if (!arriveMaxRetryCountIds.isEmpty()) {// todo 通知人工处理batchUpdateMessagesStatus(arriveMaxRetryCountIds, LocalMessageStatusEnum.ARRIVE_MAX_RETRY_COUNT);}if (!retryIds.isEmpty()) {batchUpdateRetryMessages(retryIds, failureReasons);}}/*** 批量更新消息状态*/private void batchUpdateMessagesStatus(List<Long> ids, LocalMessageStatusEnum status) {if (ids.isEmpty()) return;List<LocalMessageDO> updates = ids.stream().map(id -> {LocalMessageDO update = new LocalMessageDO();update.setId(id);update.setStatus(status.getStatus());if (status == LocalMessageStatusEnum.SEND_FAIL) {update.setRetryCount(localMessageService.getById(id).getMaxRetryCount());}return update;}).collect(Collectors.toList());if (updates.size() > 0) {localMessageService.updateBatchById(updates);}}/*** 批量更新重试消息*/private void batchUpdateRetryMessages(List<Long> ids, Map<Long, String> failReasons) {if (ids.isEmpty()) return;List<LocalMessageDO> messages = localMessageService.listByIds(ids);List<LocalMessageDO> updates = messages.stream().map(message -> {LocalMessageDO update = new LocalMessageDO();update.setId(message.getId());update.setStatus(LocalMessageStatusEnum.SEND_FAIL.getStatus());update.setRetryCount(message.getRetryCount() + 1);update.setNextRetryTime(getNextRetryTime(message.getRetryCount() + 1));update.setFailReason(failReasons.get(message.getId()));return update;}).collect(Collectors.toList());if (updates.size() > 0) {localMessageService.updateBatchById(updates);}}/*** 获取下次重试时间** @param retryCount* @return*/private long getNextRetryTime(int retryCount) {long interval = (long) Math.min(Math.pow(2, retryCount) * 1000, 3600 * 1000);return System.currentTimeMillis() + interval;}
}

库存扣减

注意库存扣减需要通过幂等组件来保证消费幂等性,key 是订单号,即保证同一个订单号只能扣减库存一次

package com.vrs.rocketMq.listener;import com.vrs.annotation.Idempotent;
import com.vrs.constant.RocketMqConstant;
import com.vrs.domain.dto.mq.TimePeriodStockReduceMqDTO;
import com.vrs.enums.IdempotentSceneEnum;
import com.vrs.service.TimePeriodService;
import com.vrs.templateMethod.MessageWrapper;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** @Author dam* @create 2024/9/20 21:30*/
@Slf4j(topic = RocketMqConstant.VENUE_TOPIC)
@Component
@RocketMQMessageListener(topic = RocketMqConstant.VENUE_TOPIC,consumerGroup = RocketMqConstant.VENUE_CONSUMER_GROUP + "-" + RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG,messageModel = MessageModel.CLUSTERING,// 监听tagselectorType = SelectorType.TAG,selectorExpression = RocketMqConstant.TIME_PERIOD_STOCK_REDUCE_TAG
)
@RequiredArgsConstructor
public class TimePeriodStockReduceListener implements RocketMQListener<MessageWrapper<TimePeriodStockReduceMqDTO>> {private final TimePeriodService timePeriodService;/*** 消费消息的方法* 方法报错就会拒收消息** @param messageWrapper 消息内容,类型和上面的泛型一致。如果泛型指定了固定的类型,消息体就是我们的参数*/@Idempotent(uniqueKeyPrefix = "time_period_stock_reduce:",key = "#messageWrapper.getMessage().getOrderSn()+''",scene = IdempotentSceneEnum.MQ,keyTimeout = 3600L)@SneakyThrows@Overridepublic void onMessage(MessageWrapper<TimePeriodStockReduceMqDTO> messageWrapper) {// 开头打印日志,平常可 Debug 看任务参数,线上可报平安(比如消息是否消费,重新投递时获取参数等)log.info("[消费者] 更新时间段的库存和空闲场号,时间段ID:{}", messageWrapper.getMessage().getTimePeriodId());timePeriodService.reduceStockAndBookedSlots(messageWrapper.getMessage());}
}

【service】

/*** 扣减库存** @param timePeriodStockReduceMqDTO*/
@Override
public void reduceStockAndBookedSlots(TimePeriodStockReduceMqDTO timePeriodStockReduceMqDTO) {baseMapper.updateStockAndBookedSlots(timePeriodStockReduceMqDTO.getTimePeriodId(), timePeriodStockReduceMqDTO.getPartitionId(), timePeriodStockReduceMqDTO.getCourtIndex());
}

【mapper】

<update id="updateStockAndBookedSlots"><![CDATA[UPDATE time_periodSET booked_slots = booked_slots | (1 << #{partitionIndex}), stock = stock - 1WHERE id = #{timePeriodId} AND stock > 0 AND partition_id = #{partitionId}]]>
</update>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/81376.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/81376.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/81376.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机网络——客户端/服务端,URI与URL的区别,以及TCP/IP核心机制全解析

文章目录 客户端/服务端&#xff0c;URI与URL的区别&#xff0c;以及TCP/IP核心机制全解析一、客户端/服务端通信模型概述二、URI 与 URL 的概念与区别1. URL&#xff08;统一资源定位符&#xff09;2. URI&#xff08;统一资源标识符&#xff09;3. URI 与 URL 的关系 三、SYN…

柔性PZT压电薄膜多维力传感器在微创手术机器人的应用

随着医疗技术的迅速发展&#xff0c;微创手术机器人正在成为外科手术的重要助手。与传统开放式手术相比&#xff0c;微创手术创伤小、恢复快、感染率低&#xff0c;对手术器械的精细操控性和感知能力提出了更高要求。多维力传感器作为机器人“触觉”的核心部件&#xff0c;对提…

SpringAI整合DeepSeek生成图表

利用Spring-ai-openai集成DeepSeek ①、在DeepSeek开放平台创建API KEY ②、创建springboot项目&#xff0c;引入spring-ai-openai依赖&#xff0c;创建配置文件&#xff0c;配置deepseek的url和api key ③、具体的实现业务应用 RestController public class ChatD…

xss-lab靶场基础详解第1~3关

第一关 我去&#xff0c;还是得多学基础啊 http://127.0.0.1/xss-labs/level1.php?name<u>a</u> 这个看他的网站源码&#xff0c;可以看到他没有过滤&#xff0c;没有被编码 然后在name<script>alert(1)</script>&#xff0c;就算过关了 第二关 …

【MySQL】聚合查询 和 分组查询

个人主页&#xff1a;♡喜欢做梦 欢迎 &#x1f44d;点赞 ➕关注 ❤️收藏 &#x1f4ac;评论 目录 &#x1f334; 一、聚合查询 &#x1f332;1.概念 &#x1f332;2.聚合查询函数 COUNT&#xff08;&#xff09; SUM&#xff08;&#xff09; AVG&#xff08;&…

计算机启动流程中,都干了啥事。比如文件挂在,操作系统加载,中断向量表加载,磁盘初始化在哪阶段。

建议在电脑上看&#xff0c;手机上格式有点问题&#xff0c;认真读&#xff0c;这方面没问题的&#xff0c;肝了一天。 目录.计算机启动详解 一.计算机启动直观图二.步骤详解前置准备磁盘初始化1.开机阶段2.执行BIOS阶段3.执行引导记录&#xff08;MBR&#xff09;阶段4.操作系…

后端开发技术之Log日志框架

第一章 日志原理 1.1 log发展历史 从JDK1.4开始提供java.until.logging&#xff0c;后来大佬发现JUL太难用了&#xff0c;就自己手撸了个log4j&#xff0c;后来log4j发现安全漏洞&#xff0c;加上代码结构问题难以维护&#xff0c;于是从1.2就停止更新log4j&#xff0c;并又重…

美丽天天秒链动2+1源码(新零售商城搭建)

什么是链动21模式&#xff1f; 链动21主要是建立团队模式&#xff0c;同时快速提升销量。是目前成员中速度最快的裂变模式。链动21模式合理合规&#xff0c;同时激励用户 公司的利润分享机制&#xff0c;让您在享受购物折扣的同时&#xff0c;也能促进并获得客观收益。 链动21模…

Python10天冲刺-设计模型之策略模式

策略模式是一种行为设计模式&#xff0c;它允许你在运行时动态地改变对象的行为。这种模式的核心思想是将一组相关的算法封装在一起&#xff0c;并让它们相互替换。 下面是使用 Python 实现策略模式的一个示例&#xff1a; 示例代码 假设我们有一个简单的购物车系统&#xf…

【CTFer成长之路】XSS的魔力

XSS闯关 level1 访问url&#xff1a; http://c884a553-d874-4514-9c32-c19c7d7b6e1c.node3.buuoj.cn/level1?usernamexss 因为是xss&#xff0c;所以对传参进行测试&#xff0c;修改?username1&#xff0c;进行访问 会发现username参数传入什么&#xff0c;welcome之后就…

自主机器人模拟系统

一、系统概述 本代码实现了一个基于Pygame的2D自主机器人模拟系统&#xff0c;具备以下核心功能&#xff1a; 双模式控制&#xff1a;支持手动控制&#xff08;WASD键&#xff09;和自动导航模式&#xff08;鼠标左键设定目标&#xff09; 智能路径规划&#xff1a;采用改进型…

快速上手非关系型数据库-MongoDB

简介 MongoDB 是一个基于文档的 NoSQL 数据库&#xff0c;由 MongoDB Inc. 开发。 NoSQL&#xff0c;指的是非关系型的数据库。NoSQL有时也称作Not Only SQL的缩写&#xff0c;是对不同于传统的关系型数据库的数据库管理系统的统称。 MongoDB 的设计理念是为了应对大数据量、…

性能优化实践:启动优化方案

性能优化实践&#xff1a;启动优化方案 在Flutter应用开发中&#xff0c;启动性能是用户体验的第一印象&#xff0c;也是应用性能优化的重要环节。本文将从理论到实践&#xff0c;深入探讨Flutter应用的启动优化方案。 一、Flutter应用启动流程分析 1. 启动类型 冷启动&…

在文本废墟中打捞月光

在文本废墟中打捞月光 ----再读三三的《山顶上是海》之“暗室”所理 今天是2025年5月1日&#xff0c;传统的“五一”小长假。当我早饭后“坐”在卫生间的那几分钟里&#xff0c;闺女和儿子就骑着家中仅有的两辆电动车去了图书馆。我是该做些什么&#xff1f; 于是我左手拿着三…

C++之类和对象基础

⾯向对象三⼤特性&#xff1a;封装、继承、多态 类和对象 一.类的定义1. 类的定义格式2.类域 二.实例化1.对象2.对象的大小 三.this指针 在 C 的世界里&#xff0c;类和对象构成了面向对象编程&#xff08;Object-Oriented Programming&#xff0c;OOP&#xff09;的核心框架&…

计算机网络——HTTP/IP 协议通俗入门详解

HTTP/IP 协议通俗入门详解 一、什么是 HTTP 协议&#xff1f;1. 基本定义2. HTTP 是怎么工作的&#xff1f; 二、HTTP 协议的特点三、HTTPS 是什么&#xff1f;它和 HTTP 有啥区别&#xff1f;1. HTTPS 概述2. HTTP vs HTTPS 四、HTTP 的通信过程步骤详解&#xff1a; 五、常见…

使用 Java 实现一个简单且高效的任务调度框架

目录 一、任务调度系统概述 &#xff08;一&#xff09;任务调度的目标 &#xff08;二&#xff09;任务调度框架的关键组成 二、任务状态设计 &#xff08;一&#xff09;任务状态流转设计 &#xff08;二&#xff09;任务表设计&#xff08;SQL&#xff09; 三、单机任…

基于GPT 模板开发智能写作辅助应用

目录 项目说明 1. 项目背景 2. 项目目标 3. 功能需求 4. 技术选型 项目结构 详细代码实现 前端代码(client) client/src/main.js client/src/App.vue client/src/components/HistoryList.vue 后端代码(server) server/app.js server/routes/api.js server/mo…

linux 使用nginx部署next.js项目,并使用pm2守护进程

前言 本文基于&#xff1a;操作系统 CentOS Stream 8 使用工具&#xff1a;Xshell8、Xftp8 服务器基础环境&#xff1a; node - 请查看 linux安装node并全局可用pm2 - 请查看 linux安装pm2并全局可用nginx - 请查看 linux 使用nginx部署vue、react项目 所需服务器基础环境&…

使用huggingface_hub需要注意的事项

在安装huggingface_hub的时候要注意如果你的python是放在c盘下时记得用管理员模式命令行来安装huggingface_hub&#xff0c;否则安装过程会报错&#xff0c;之后也不会有huggingface-cli命令。 如果安装时因为没有用管理员权限安装而报错了&#xff0c;可以先卸载huggingface-…