基础模块
1. 邮箱发送功能
最初设计的接口 (雏形)
public interface EmailService {/*** 发送验证码邮件** @param email 目标邮箱* @return 发送的code* @throws RuntimeException 如果发送邮件失败,将抛出异常*/String sendVerificationCode(String email);/*** 校验验证码是否正确** @param email 邮箱地址* @param code 用户输入的验证码* @return true 表示校验通过,false 为不通过*/boolean checkVerificationCode(String email, String code);/*** 判断邮箱当前是否处于验证码限流状态** @param email 邮箱地址* @return true 表示当前已限流,不可发送,false 表示未限流,可以发送*/boolean isVerificationCodeRateLimited(String email);
}
EmailServiceImpl 实现类 ( 具体实现)
@Service
public class EmailServiceImpl implements EmailService {@Autowiredprivate ObjectMapper objectMapper;@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Value("${mail.verify-code.limit-expire-seconds}")private int limitExpireSeconds;@Overridepublic String sendVerificationCode(String email) {// ...}@Overridepublic boolean checkVerificationCode(String email, String code) {// ...}@Overridepublic boolean isVerificationCodeRateLimited(String email) {// ...}
}
分析
先来分析实现类中的三个注入的对象的用途:
redisTemplate:操作redis的接口,可以类比JDBC接口
limitExpireSeconds:application.yaml中的配置,表示一个email在发送一条邮件后,会被限流多长时间,才能发送第二条邮件,配置文件中是60秒,可以按照自己的需求修改
objectMapper:Jackson的接口对象,用于JSON和对象的相互转换
sendVerificationCode方法和"EmailTaskConsumer"方法
到这里,我们可以开始实现一个"基于Redis消息队列"的轻量级异步任务机制了。
消息队列有两个最基础的部分:
1.生产者
2.消费者
从代码层面来说,sendVerificationCode就是所谓的生产者。
生产者的工作步骤:
1.接收email参数,生成一个email对应的code
2.将email、code、以及当前的时间戳封装为一个对象(EmailTask对象)
3.将EmailTask对象序列化为JSON字符串(Redis只支持存储字符串)
4.将JSON字符串写入到消息队列中
5.防止重复请求,给邮箱设置一个限流键(这步可以不包含在sendVerificationCode中)
消费者的工作步骤:
1.每隔一段时间轮询Redis,查看消息队列中是否存在任务
2.将任务的JSON字符串从消息队列中取出,将其反序列化为EmailTask对象
3.根据EmailTask对象中的字段,填充SimpleEmailMessage对象
4.JavaMailSender发送SimpleEmailMessage对象(调用第三方服务,发邮件给用户)
到这里,生产者和消费者需要完成的步骤已经规划完毕,接下来我们看具体代码实现。
生产者部分:
首先是EmailTask对象,拥有三个字段
1.email字段:用户的email
2.code:验证码
3.timestamp:时间戳
@Data
public class EmailTask {private String email;private String code;private long timestamp;
}
接下来是 sendVerificationCode 的实现
@Override
public String sendVerificationCode(String email) {// 检查发送频率if (isVerificationCodeRateLimited(email)) {throw new RuntimeException("验证码发送太频繁,请 60 秒后重试");}// 生成6位随机验证码String verificationCode = RandomCodeUtil.generateNumberCode(6);// 实现异步发送邮件的逻辑try {// 创建邮件任务EmailTask emailTask = new EmailTask();// 初始化邮件任务内容// 1. 邮件目的邮箱// 2. 验证码// 3. 时间戳emailTask.setEmail(email);emailTask.setCode(verificationCode);emailTask.setTimestamp(System.currentTimeMillis());// 将邮件任务存入消息队列// 1. 将任务对象转成 JSON 字符串// 2. 将 JSON 字符串保存到 Redis 模拟的消息队列中String emailTaskJson = objectMapper.writeValueAsString(emailTask);String queueKey = RedisKey.emailTaskQueue();redisTemplate.opsForList().leftPush(queueKey, emailTaskJson);// 设置 email 发送注册验证码的限制String emailLimitKey = RedisKey.registerVerificationLimitCode(email);redisTemplate.opsForValue().set(emailLimitKey, "1", limitExpireSeconds, TimeUnit.SECONDS);return verificationCode;} catch (Exception e) {log.error("发送验证码邮件失败", e);throw new RuntimeException("发送验证码失败,请稍后重试");}
}
消费者部分
EmailTaskConsumer 的实现
@Component
public class EmailTaskConsumer {@Autowiredprivate JavaMailSender mailSender;@Autowiredprivate ObjectMapper objectMapper;@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Value("${spring.mail.username}")private String from;// 每 3 秒轮询一次 redis,查看是否有待发的邮件任务@Scheduled(fixedDelay = 3000)public void resume() throws JsonProcessingException {String emailQueueKey = RedisKey.emailTaskQueue();// 从队列中取任务对象while (true) {// 获取任务对象String emailTaskJson = redisTemplate.opsForList().rightPop(emailQueueKey);if (emailTaskJson == null) { // 队列中没有任务对象,退出本次执行break;}// 将 redis 中的 JSON 字符串转成 emailTask 对象EmailTask emailTask = objectMapper.readValue(emailTaskJson, EmailTask.class);String email = emailTask.getEmail();String verificationCode = emailTask.getCode();// 根据 emailTask 对象中的信息// 填充 SimpleMailMessage 对象,然后使用 JavaMailSender 发送邮件SimpleMailMessage mailMessage = new SimpleMailMessage();mailMessage.setFrom(from);mailMessage.setTo(email);mailMessage.setSubject("卡码笔记- 验证码");mailMessage.setText("您的验证码是:" + verificationCode + ",有效期" + 5 + "分钟,请勿泄露给他人。");mailSender.send(mailMessage);// 保存验证码到 Redis// 有效时间为 5 分钟redisTemplate.opsForValue().set(RedisKey.registerVerificationCode(email), verificationCode, 5, TimeUnit.MINUTES);}}
}
checkVerificationCode 的实现
@Override
public boolean checkVerificationCode(String email, String code) {String redisKey = RedisKey.registerVerificationCode(email);String verificationCode = redisTemplate.opsForValue().get(redisKey);if (verificationCode != null && verificationCode.equals(code)) {redisTemplate.delete(redisKey);return true;}return false;
}
checkVerificationCode的实现逻辑很简单,根据用户的email查询出redis种存储的code,和用户输入的code,比较是否相等。在验证成功后需要将Redis中的记录删除。
isVerificationCodeRateLimited 实现
@Override
public boolean isVerificationCodeRateLimited(String email) {String redisKey = RedisKey.registerVerificationLimitCode(email);return redisTemplate.opsForValue().get(redisKey) != null;
}
检查Redis中是否存在email对应的限流键,存在则返回true表示已被限流
2. 排行榜
Spring + Mysql + Redis 异步更新
- 1. 用户提交笔记时,同步写业务数据后异步发送一条消息到 Redis 列表(当作 MQ),消息是 JSON(包含 userId、score 增量、messageId、timestamp、noteId 等)。
- 2. 后台消费进程用 BRPOP/阻塞弹出消息,解析后对 Redis 的 ZSET 做增量更新(ZINCRBY),ZSET 用于排行榜查询。
- 3. 定时任务每小时把 Redis 中的 ZSET 批量持久化到 MySQL(通过 MyBatis 批量 upsert),以确保最终一致性和历史存档。
controller.java
接收用户提交并返回消息
@RestController
@RequestMapping("/notes")
public class NoteController {private final ProducerService producer;@PostMappingpublic ResponseEntity<?> submitNote(@RequestBody NoteSubmitDto dto){// 1. 处理业务写入笔记表(略)// 2. 发送异步消息NoteMessage msg = new NoteMessage(...);producer.sendMessage(msg);return ResponseEntity.ok().build();}
}
Producer
(将消息 push 到 Redis 列表)
@Service
public class ProducerService {private static final String QUEUE = "mq:note:queue";private final RedisTemplate<String, String> redis;private final ObjectMapper mapper = new ObjectMapper();public ProducerService(RedisTemplate<String, String> redis){ this.redis = redis; }public void sendMessage(NoteMessage msg){try {String json = mapper.writeValueAsString(msg);redis.opsForList().leftPush(QUEUE, json);} catch (Exception e) {// 记录失败/监控}}
}
Consumer
(后台阻塞消费,更新 ZSET)
@Service
public class RedisConsumerService {private static final String QUEUE = "mq:note:queue";private static final String DLQ = "mq:note:dlq";private final RedisTemplate<String, String> redis;private final LeaderboardService leaderboard;private final ObjectMapper mapper = new ObjectMapper();private volatile boolean running = true;public RedisConsumerService(RedisTemplate<String, String> redis, LeaderboardService leaderboard) {this.redis = redis; this.leaderboard = leaderboard;}@PostConstructpublic void start() {Thread t = new Thread(this::loop, "redis-mq-consumer");t.setDaemon(true);t.start();}private void loop() {while (running) {try {// 阻塞弹出(0 表示一直阻塞)String json = redis.opsForList().rightPop(QUEUE, 0, TimeUnit.SECONDS);if (json == null) continue;NoteMessage msg = mapper.readValue(json, NoteMessage.class);// 可做幂等校验(基于 messageId)leaderboard.incrementScore(msg.getUserId(), msg.getDelta());} catch (Exception e) {// 解析或处理失败 -> 推到死信队列并记录redis.opsForList().leftPush(DLQ, /* 原始消息 */);}}}@PreDestroypublic void stop() { running = false; }
}
LeaderboardService
(封装 ZSET 操作)
@Service
public class LeaderboardService {private static final String ZKEY = "leaderboard:zset";private final RedisTemplate<String, String> redis;public LeaderboardService(RedisTemplate<String, String> redis){ this.redis = redis; }public void incrementScore(Long userId, double delta){redis.opsForZSet().incrementScore(ZKEY, userId.toString(), delta);}public Set<ZSetOperations.TypedTuple<String>> topN(int n){return redis.opsForZSet().reverseRangeWithScores(ZKEY, 0, n - 1);}public Set<ZSetOperations.TypedTuple<String>> rangeAllWithScores(){return redis.opsForZSet().rangeWithScores(ZKEY, 0, -1);}
}
DB 定时持久化
(每小时)
@Component
public class DbPersistScheduler {private final LeaderboardService leaderboardService;private final LeaderboardMapper mapper;public DbPersistScheduler(LeaderboardService leaderboardService, LeaderboardMapper mapper){this.leaderboardService = leaderboardService; this.mapper = mapper;}// 每小时执行一次(整点)@Scheduled(cron = "0 0 * * * ?")public void persistHourly() {// 读取全部或 top K(注意数据量)Set<ZSetOperations.TypedTuple<String>> entries = leaderboardService.rangeAllWithScores();if (entries.isEmpty()) return;List<Leaderboard> list = entries.stream().map(t -> new Leaderboard(Long.valueOf(t.getValue()), t.getScore().longValue(), new Date())).collect(Collectors.toList());// 批量 upsert(MyBatis 实现)mapper.batchUpsert(list);}
}
MyBatis Mapper + SQL
Mapper 接口:
Apply to HallScene.ts
public interface LeaderboardMapper {void batchUpsert(@Param("list") List<Leaderboard> list);// 可加查询方法
}mapper XML(LeaderboardMapper.xml)<insert id="batchUpsert" parameterType="map">INSERT INTO leaderboard (user_id, score, updated_at)VALUES<foreach collection="list" item="item" separator=",">(#{item.userId}, #{item.score}, #{item.updatedAt})</foreach>ON DUPLICATE KEY UPDATEscore = VALUES(score),updated_at = VALUES(updated_at)
</insert>
- 幂等与重复消息:消息包含 messageId,消费端在 Redis/DB 中做幂等检查(如 SETNX messageId:processed 或 Redis Hash 记录已处理 id)。
- 死信与重试:消费失败推到 mq:note:dlq,人工或单独重试流程处理。
- 批量与限流:ZSET 的全量持久化当数据量大时会耗时,建议:
- 只持久化 Top N(按需求),或分批(分页 ZRANGE)。
- 按用户分区并并行持久化。
- 持久化策略:可以每小时写入 DB 覆盖(ON DUPLICATE KEY),也可写入增量表(记录历史)。
- 数据一致性:在关机或部署前优雅停止消费线程并将 Redis 中的变更 flush 到 DB。
- 性能:消费端更新 ZSET 为单条 Redis 请求,若高并发可用 pipeline 或本地批队列合并多条消息后一起 ZINCRBY(减少网络往返)。
- 监控:记录队列长度(LLEN)、consumer 错误率、持久化耗时,设置报警。