摘要
本文深入探讨了Java并发编程的核心概念及其在电商系统中的实际应用。从基础并发机制到高级并发工具,结合电商业务场景中的典型问题,如高并发秒杀、库存管理、订单处理等,提供了实用的解决方案和最佳实践。
1. Java并发编程基础
1.1 并发与并行
并发(Concurrency)是指系统能够处理多个任务的能力,而并行(Parallelism)则是指系统能够同时执行多个任务。在多核处理器时代,Java通过线程实现了真正的并行计算。
1.2 Java内存模型(JMM)
Java内存模型定义了线程如何与主内存以及彼此的工作内存进行交互。JMM解决了可见性、原子性和有序性问题,为并发编程提供了基础保障。
public class VisibilityExample {private boolean flag = false;public void writer() {flag = true; // 线程A执行}public void reader() {if (flag) { // 线程B执行System.out.println("Flag is true");}}
}
在上述代码中,如果没有适当的同步机制,线程A对flag的修改可能对线程B不可见。
1.3 synchronized与volatile
- synchronized:保证原子性和可见性,通过管程(Monitor)实现
- volatile:保证可见性和有序性,但不保证原子性
public class Counter {private volatile int count = 0;public synchronized void increment() {count++; // 复合操作,需要synchronized保证原子性}public int getCount() {return count; // volatile保证可见性}
}
2. Java并发工具类
2.1 线程池(ThreadPoolExecutor)
线程池通过重用线程降低了线程创建和销毁的开销,提高了系统响应速度。
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, // 核心线程数maximumPoolSize, // 最大线程数keepAliveTime, // 空闲线程存活时间TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000), // 任务队列new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
2.2 并发集合
- ConcurrentHashMap:线程安全的HashMap实现,采用分段锁或CAS机制
- CopyOnWriteArrayList:写时复制,适合读多写少的场景
- BlockingQueue:阻塞队列,用于生产者-消费者模式
2.3 同步工具类
- CountDownLatch:允许一个或多个线程等待其他线程完成操作
- CyclicBarrier:让一组线程到达一个屏障时被阻塞,直到最后一个线程到达时才放开
- Semaphore:控制同时访问资源的线程数量
3. 电商场景中的并发应用
3.1 高并发秒杀系统
秒杀系统是电商场景中典型的并发挑战,需要解决超卖、性能瓶颈等问题。
3.1.1 乐观锁实现
public class SeckillService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;public boolean seckill(Long productId, Long userId) {// 使用Redis的原子操作String key = "seckill:product:" + productId;// 监视商品库存redisTemplate.watch(key);int stock = (int) redisTemplate.opsForValue().get(key);if (stock <= 0) {redisTemplate.unwatch();return false;}// 开启事务redisTemplate.multi();redisTemplate.opsForValue().decrement(key);redisTemplate.opsForList().rightPush("seckill:user:" + productId, userId);// 执行事务List<Object> exec = redisTemplate.exec();if (exec == null || exec.isEmpty()) {// 事务执行失败,可能是库存已被其他线程修改return false;}return true;}
}
3.1.2 令牌桶限流
public class RateLimiter {private final int maxTokens;private final int refillRate;private int currentTokens;private long lastRefillTimestamp;public synchronized boolean tryAcquire() {refill();if (currentTokens > 0) {currentTokens--;return true;}return false;}private void refill() {long now = System.currentTimeMillis();long elapsedTime = now - lastRefillTimestamp;int tokensToAdd = (int) (elapsedTime * refillRate / 1000);if (tokensToAdd > 0) {currentTokens = Math.min(maxTokens, currentTokens + tokensToAdd);lastRefillTimestamp = now;}}
}
3.2 库存管理系统
库存管理需要保证数据一致性,防止超卖和少卖。
3.2.1 分布式锁实现
public class DistributedLock {private final RedisTemplate<String, String> redisTemplate;private final String lockKey;private final String requestId;private final int expireTime;public boolean tryLock() {return Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, expireTime, TimeUnit.MILLISECONDS));}public boolean unlock() {// 使用Lua脚本确保原子性String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +"return redis.call('del', KEYS[1]) " +"else " +"return 0 " +"end";DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);Long result = redisTemplate.execute(redisScript, Collections.singletonList(lockKey), requestId);return result != null && result == 1;}
}
3.2.2 库存扣减方案
@Service
public class InventoryService {@Autowiredprivate DistributedLockFactory lockFactory;@Transactionalpublic boolean deductInventory(Long productId, int quantity) {String lockKey = "inventory:lock:" + productId;DistributedLock lock = lockFactory.getLock(lockKey);try {if (lock.tryLock(3, TimeUnit.SECONDS)) {// 查询库存Inventory inventory = inventoryMapper.selectByProductId(productId);if (inventory == null || inventory.getQuantity() < quantity) {return false;}// 扣减库存inventory.setQuantity(inventory.getQuantity() - quantity);inventoryMapper.updateById(inventory);// 记录库存流水InventoryLog log = new InventoryLog();log.setProductId(productId);log.setQuantity(-quantity);log.setCreateTime(new Date());inventoryLogMapper.insert(log);return true;}return false;} catch (InterruptedException e) {Thread.currentThread().interrupt();return false;} finally {lock.unlock();}}
}
3.3 订单处理系统
订单处理涉及多个步骤,需要保证最终一致性。
3.3.1 异步订单处理
@Service
public class OrderService {@Autowiredprivate ThreadPoolExecutor orderExecutor;@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate PaymentService paymentService;@Autowiredprivate InventoryService inventoryService;public void createOrder(Order order) {// 保存订单orderMapper.insert(order);// 异步处理订单orderExecutor.submit(() -> {try {// 扣减库存boolean deductResult = inventoryService.deductInventory(order.getProductId(), order.getQuantity());if (!deductResult) {// 库存不足,取消订单order.setStatus(OrderStatus.CANCELED.getCode());orderMapper.updateById(order);return;}// 调用支付服务boolean payResult = paymentService.processPayment(order.getId(), order.getAmount());if (payResult) {// 支付成功,更新订单状态order.setStatus(OrderStatus.PAID.getCode());} else {// 支付失败,回滚库存inventoryService.rollbackInventory(order.getProductId(), order.getQuantity());order.setStatus(OrderStatus.PAY_FAILED.getCode());}orderMapper.updateById(order);} catch (Exception e) {// 处理异常,记录日志log.error("订单处理异常", e);order.setStatus(OrderStatus.EXCEPTION.getCode());orderMapper.updateById(order);}});}
}
3.3.2 订单状态机
public enum OrderStatus {PENDING(1, "待支付"),PAID(2, "已支付"),SHIPPED(3, "已发货"),COMPLETED(4, "已完成"),CANCELED(5, "已取消"),PAY_FAILED(6, "支付失败"),EXCEPTION(7, "异常");private final int code;private final String desc;// 状态转换规则private static final Map<OrderStatus, Set<OrderStatus>> ALLOWED_TRANSITIONS = new EnumMap<>(OrderStatus.class);static {ALLOWED_TRANSITIONS.put(PENDING, new HashSet<>(Arrays.asList(PAID, CANCELED)));ALLOWED_TRANSITIONS.put(PAID, new HashSet<>(Arrays.asList(SHIPPED, CANCELED)));ALLOWED_TRANSITIONS.put(SHIPPED, new HashSet<>(Collections.singletonList(COMPLETED)));ALLOWED_TRANSITIONS.put(CANCELED, Collections.emptySet());ALLOWED_TRANSITIONS.put(COMPLETED, Collections.emptySet());ALLOWED_TRANSITIONS.put(PAY_FAILED, new HashSet<>(Arrays.asList(PENDING, CANCELED)));ALLOWED_TRANSITIONS.put(EXCEPTION, new HashSet<>(Arrays.asList(PENDING, CANCELED)));}public boolean canTransitionTo(OrderStatus newStatus) {return ALLOWED_TRANSITIONS.get(this).contains(newStatus);}
}
4. 性能优化与最佳实践
4.1 减少锁竞争
- 缩小锁范围:只锁定必要的代码块
- 使用读写锁:读多写少场景下提高并发性
- 分段锁:如ConcurrentHashMap的实现方式
public class SegmentLockCache<K, V> {private final int segmentCount;private final List<Map<K, V>> segments;private final List<Lock> locks;public SegmentLockCache(int segmentCount) {this.segmentCount = segmentCount;this.segments = new ArrayList<>(segmentCount);this.locks = new ArrayList<>(segmentCount);for (int i = 0; i < segmentCount; i++) {segments.add(new HashMap<>());locks.add(new ReentrantLock());}}public void put(K key, V value) {int segmentIndex = key.hashCode() % segmentCount;Lock lock = locks.get(segmentIndex);Map<K, V> segment = segments.get(segmentIndex);lock.lock();try {segment.put(key, value);} finally {lock.unlock();}}public V get(K key) {int segmentIndex = key.hashCode() % segmentCount;Map<K, V> segment = segments.get(segmentIndex);// 读操作不加锁,可能会有脏读,根据业务场景决定return segment.get(key);}
}
4.2 无锁编程
使用CAS(Compare-And-Swap)操作实现无锁算法,提高并发性能。
public class NonBlockingCounter {private final AtomicLong counter = new AtomicLong(0);public void increment() {long oldValue, newValue;do {oldValue = counter.get();newValue = oldValue + 1;} while (!counter.compareAndSet(oldValue, newValue));}public long get() {return counter.get();}
}
4.3 线程池调优
根据业务特点合理配置线程池参数:
// CPU密集型任务
ThreadPoolExecutor cpuIntensiveExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors(),60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>()
);
// IO密集型任务
ThreadPoolExecutor ioIntensiveExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2,Runtime.getRuntime().availableProcessors() * 4,60L, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000),new ThreadPoolExecutor.CallerRunsPolicy()
);
5. 场景清单
场景1:基础并发概念
请解释一下Java中的volatile关键字和synchronized关键字的区别?
回答:
volatile和synchronized都是Java中用于实现线程同步的机制,但它们有以下区别:
- 功能范围:
- volatile只能修饰变量,而synchronized可以修饰方法、代码块
- volatile保证变量的可见性和有序性,但不保证原子性
- synchronized保证原子性、可见性和有序性
- 实现机制:
- volatile通过内存屏障和禁止指令重排序实现
- synchronized通过管程(Monitor)机制实现,涉及对象头的Mark Word
- 性能影响:
- volatile不会引起线程上下文切换,性能开销较小
- synchronized可能引起线程阻塞和上下文切换,性能开销较大
- 使用场景:
- volatile适用于一个线程写,多个线程读的场景
- synchronized适用于复合操作或需要保证原子性的场景
在实际应用中,我会根据具体需求选择合适的同步机制。例如,在电商秒杀系统中,对于商品库存这种需要保证原子性的操作,我会使用synchronized或分布式锁;而对于状态标志位这种简单变量,我会使用volatile。
场景2:线程池应用
在电商系统中,如何合理配置线程池参数?如果遇到任务堆积和拒绝任务的情况,你会如何处理?
回答:
在电商系统中配置线程池参数需要考虑以下几个因素:
- 核心参数配置:
- 核心线程数(corePoolSize):对于CPU密集型任务,设置为CPU核心数;对于IO密集型任务,设置为CPU核心数的2倍左右
- 最大线程数(maximumPoolSize):考虑系统资源限制和业务峰值,通常设置为核心线程数的2-4倍
- 队列容量:根据系统内存和任务处理速度设置,避免OOM
- 线程存活时间(keepAliveTime):根据业务波峰波谷特点设置,通常30秒到几分钟
- 拒绝策略选择:
- AbortPolicy:直接抛出异常,适合关键业务
- CallerRunsPolicy:由提交任务的线程执行,适合非关键业务
- DiscardOldestPolicy:丢弃队列中最老的任务,适合可容忍任务丢失的场景
- DiscardPolicy:直接丢弃任务,适合可容忍任务丢失的场景
- 任务堆积处理:
- 监控线程池队列大小和活跃线程数,设置告警阈值
- 实现动态调整线程池参数的能力,应对流量高峰
- 对于长时间运行的任务,设置超时机制
- 考虑使用异步+回调方式,避免阻塞线程
在实际项目中,我会这样处理电商订单处理线程池:
// 订单处理线程池配置
ThreadPoolExecutor orderExecutor = new ThreadPoolExecutor(10, // 核心线程数50, // 最大线程数60, // 空闲线程存活时间(秒)TimeUnit.SECONDS,new LinkedBlockingQueue<>(1000), // 任务队列new ThreadFactoryBuilder().setNameFormat("order-pool-%d").build(),new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 监控线程池状态
ScheduledExecutorService monitorExecutor = Executors.newSingleThreadScheduledExecutor();
monitorExecutor.scheduleAtFixedRate(() -> {int activeCount = orderExecutor.getActiveCount();int queueSize = orderExecutor.getQueue().size();long completedTaskCount = orderExecutor.getCompletedTaskCount();// 记录监控指标log.info("OrderPool Stats: active={}, queue={}, completed={}", activeCount, queueSize, completedTaskCount);// 如果队列堆积超过阈值,可以触发告警或动态调整参数if (queueSize > 800) {log.warn("OrderPool queue size exceeds threshold: {}", queueSize);// 可以触发告警或动态调整参数}
}, 0, 1, TimeUnit.MINUTES);
场景3:高并发秒杀系统设计
请设计一个高并发的秒杀系统,需要考虑哪些关键点?如何防止超卖?
回答:
设计高并发秒杀系统需要考虑以下几个关键点:
- 系统架构设计:
- 前端:页面静态化,CDN加速,按钮防重复提交
- 网关层:限流、熔断、缓存
- 服务层:服务拆分,异步处理
- 数据层:读写分离,分库分表
- 防止超卖方案:
- 数据库层面:使用乐观锁或悲观锁
-- 乐观锁更新 UPDATE product SET stock = stock - 1, version = version + 1 WHERE id = #{productId} AND stock > 0 AND version = #{version}
- Redis层面:使用Redis原子操作
// Lua脚本保证原子性 String script = "local stock = redis.call('get', KEYS[1]) " +"if tonumber(stock) > 0 then " +" redis.call('decr', KEYS[1]) " +" return 1 " +"else " +" return 0 " +"end";
- 消息队列:将请求放入消息队列,按顺序处理
- 数据库层面:使用乐观锁或悲观锁
- 限流措施:
- 接口限流:使用令牌桶或漏桶算法
- 用户限流:限制单个用户的请求频率
- 总体限流:系统级别的流量控制
- 缓存策略:
- 商品信息缓存:提前加载到Redis
- 库存缓存:使用Redis存储库存
- 本地缓存:热点数据本地缓存
- 异步处理:
- 下单请求先落库,状态为处理中
- 异步校验库存、扣减库存
- 成功后更新订单状态,失败则回滚
实际代码实现示例:
@Service
public class SeckillService {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate ProductMapper productMapper;@Autowiredprivate OrderMapper orderMapper;// 秒杀请求入口public boolean seckill(Long productId, Long userId) {// 1. 限流检查if (!rateLimiter.tryAcquire()) {throw new BusinessException("系统繁忙,请稍后再试");}// 2. 用户限流检查String userKey = "seckill:user:" + userId + ":" + productId;if (redisTemplate.hasKey(userKey)) {throw new BusinessException("不能重复参与秒杀");}redisTemplate.opsForValue().set(userKey, "1", 1, TimeUnit.HOURS);// 3. 预减库存String stockKey = "seckill:stock:" + productId;Long stock = redisTemplate.opsForValue().decrement(stockKey);if (stock < 0) {// 库存不足,回滚redisTemplate.opsForValue().increment(stockKey);throw new BusinessException("商品已售罄");}// 4. 创建订单(状态为处理中)Order order = new Order();order.setProductId(productId);order.setUserId(userId);order.setStatus(OrderStatus.PROCESSING.getCode());order.setCreateTime(new Date());orderMapper.insert(order);// 5. 发送消息到MQ进行异步处理rabbitTemplate.convertAndSend("seckill.order", order);return true;}// 消费者处理订单@RabbitListener(queues = "seckill.order")public void handleOrder(Order order) {try {// 1. 真正扣减数据库库存Product product = productMapper.selectById(order.getProductId());if (product.getStock() <= 0) {// 库存不足,更新订单状态为失败order.setStatus(OrderStatus.FAILED.getCode());orderMapper.updateById(order);return;}// 使用乐观锁扣减库存int updateCount = productMapper.decreaseStockWithVersion(order.getProductId(), product.getVersion());if (updateCount == 0) {// 版本号不匹配,说明库存已被其他线程修改order.setStatus(OrderStatus.FAILED.getCode());orderMapper.updateById(order);return;}// 2. 更新订单状态为成功order.setStatus(OrderStatus.SUCCESS.getCode());orderMapper.updateById(order);// 3. 发送支付消息rabbitTemplate.convertAndSend("payment.order", order);} catch (Exception e) {log.error("处理秒杀订单异常", e);// 更新订单状态为异常order.setStatus(OrderStatus.EXCEPTION.getCode());orderMapper.updateById(order);}}
}
场景4:分布式事务处理
在电商系统中,下单操作涉及扣减库存、创建订单、用户账户扣款等多个步骤,如何保证数据一致性?
回答:
在电商系统中,下单操作涉及多个服务的数据一致性,可以采用以下方案:
- 分布式事务方案选择:
- 2PC/3PC:强一致性方案,但性能较差,可用性不高
- TCC:Try-Confirm-Cancel模式,适用于性能要求高的场景
- 本地消息表:基于消息队列的最终一致性方案
- Saga模式:将长事务拆分为多个本地事务,通过补偿机制保证一致性
- 电商系统推荐方案:
对于电商下单场景,我推荐采用基于消息队列的最终一致性方案,结合本地消息表或事务消息。 - 具体实现:
@Service
public class OrderService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate InventoryService inventoryService;@Autowiredprivate AccountService accountService;@Autowiredprivate TransactionTemplate transactionTemplate;@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate MessageLogMapper messageLogMapper;// 下单入口public void placeOrder(OrderDTO orderDTO) {// 1. 创建订单(本地事务)Order order = createOrder(orderDTO);// 2. 发送事务消息sendTransactionMessage(order);}// 创建订单(本地事务)private Order createOrder(OrderDTO orderDTO) {return transactionTemplate.execute(status -> {// 1. 创建订单Order order = new Order();BeanUtils.copyProperties(orderDTO, order);order.setStatus(OrderStatus.PENDING_PAYMENT.getCode());order.setCreateTime(new Date());orderMapper.insert(order);// 2. 记录消息日志(状态为发送中)MessageLog messageLog = new MessageLog();messageLog.setMessageId(UUID.randomUUID().toString());messageLog.setMessage(JSON.toJSONString(order));messageLog.setStatus(MessageStatus.SENDING.getCode());messageLog.setCreateTime(new Date());messageLogMapper.insert(messageLog);return order;});}// 发送事务消息private void sendTransactionMessage(Order order) {// 1. 发送消息到MQrabbitTemplate.convertAndSend("order.process", order);// 2. 更新消息状态为已发送messageLogMapper.updateStatusByMessageId(order.getMessageId(), MessageStatus.SENT.getCode());}// 消息消费者处理订单@RabbitListener(queues = "order.process")public void processOrder(Order order) {try {// 1. 扣减库存boolean inventoryResult = inventoryService.deductInventory(order.getProductId(), order.getQuantity());if (!inventoryResult) {// 库存不足,取消订单cancelOrder(order, "库存不足");return;}// 2. 扣减用户账户boolean accountResult = accountService.deductBalance(order.getUserId(), order.getAmount());if (!accountResult) {// 账户余额不足,回滚库存并取消订单inventoryService.rollbackInventory(order.getProductId(), order.getQuantity());cancelOrder(order, "账户余额不足");return;}// 3. 更新订单状态为已完成order.setStatus(OrderStatus.COMPLETED.getCode());orderMapper.updateById(order);// 4. 更新消息状态为已处理messageLogMapper.updateStatusByMessageId(order.getMessageId(), MessageStatus.PROCESSED.getCode());} catch (Exception e) {log.error("处理订单异常", e);// 处理失败,等待重试}}// 取消订单private void cancelOrder(Order order, String reason) {order.setStatus(OrderStatus.CANCELED.getCode());order.setCancelReason(reason);orderMapper.updateById(order);// 更新消息状态为已处理messageLogMapper.updateStatusByMessageId(order.getMessageId(), MessageStatus.PROCESSED.getCode());}// 定时任务补偿处理@Scheduled(fixedDelay = 60000)public void compensateMessages() {// 查询发送中的消息List<MessageLog> messageLogs = messageLogMapper.selectByStatus(MessageStatus.SENDING.getCode());for (MessageLog messageLog : messageLogs) {try {// 重新发送消息Order order = JSON.parseObject(messageLog.getMessage(), Order.class);rabbitTemplate.convertAndSend("order.process", order);// 更新消息状态为已发送messageLogMapper.updateStatusByMessageId(messageLog.getMessageId(), MessageStatus.SENT.getCode());} catch (Exception e) {log.error("补偿消息发送失败", e);}}// 查询已发送但未处理的消息List<MessageLog> sentMessages = messageLogMapper.selectByStatus(MessageStatus.SENT.getCode());for (MessageLog messageLog : sentMessages) {// 检查消息是否超时(如5分钟)if (System.currentTimeMillis() - messageLog.getCreateTime().getTime() > 5 * 60 * 1000) {try {Order order = JSON.parseObject(messageLog.getMessage(), Order.class);// 回滚操作inventoryService.rollbackInventory(order.getProductId(), order.getQuantity());// 取消订单cancelOrder(order, "订单处理超时");// 更新消息状态为已处理messageLogMapper.updateStatusByMessageId(messageLog.getMessageId(), MessageStatus.PROCESSED.getCode());} catch (Exception e) {log.error("超时消息处理失败", e);}}}}
}
- 可靠性保障:
- 消息持久化:确保消息不丢失
- 消息确认机制:消费者确认处理成功
- 重试机制:失败后自动重试
- 补偿机制:定时任务检查并补偿未处理的消息
- 幂等性设计:防止重复处理导致的数据不一致
- 监控与告警:
- 监控消息堆积情况
- 监控事务处理成功率
- 设置异常告警机制
这种方案实现了最终一致性,在保证业务连续性的同时,提供了较好的性能和可用性。在实际应用中,还可以结合分布式事务框架如Seata来简化实现。
6. 总结
Java并发编程在电商系统中扮演着至关重要的角色。从基础的线程安全控制到高级的并发模式应用,合理利用Java并发工具可以显著提升系统性能和稳定性。在电商场景中,如秒杀、库存管理、订单处理等核心业务,都需要精心设计的并发控制机制来保证系统的高可用和数据一致性。