关键步骤:
Spring Boot 启动时创建
LettuceConnectionFactory
根据配置类型(集群/哨兵/单机)初始化客户端
对于集群模式:
创建
RedisClusterClient
调用
setOptions(getClusterClientOptions(configuration))
应用配置
2. 节点状态检查机制
拓扑刷新选项加载:
java
private ClusterTopologyRefreshOptions getClusterTopologyRefreshOptions() {ClusterClientOptions clusterClientOptions = redisClusterClient.getClusterClientOptions();return clusterClientOptions != null ? clusterClientOptions.getTopologyRefreshOptions() : FALLBACK_OPTIONS; }
状态检查触发条件:
java
private boolean isEnabled(RefreshTrigger refreshTrigger) {return getClusterTopologyRefreshOptions().getAdaptiveRefreshTriggers().contains(refreshTrigger); }
3. 失败重连原理
重连事件处理:
java
@Override public void onReconnectAttempt(int attempt) {if (isEnabled(RefreshTrigger.PERSISTENT_RECONNECTS) &&attempt >= getRefreshTriggersReconnectAttempts()) {if (indicateTopologyRefreshSignal()) {emitAdaptiveRefreshScheduledEvent();}} }
工作流程:
当发生网络断开时,Lettuce 自动尝试重连
每次重连尝试都会调用
onReconnectAttempt
方法检查是否启用了
PERSISTENT_RECONNECTS
触发器检查重连次数是否达到阈值(
refreshTriggersReconnectAttempts
)如果满足条件,触发拓扑刷新事件
4. 拓扑刷新执行过程
刷新激活机制:
java
private void activateTopologyRefreshIfNeeded() {if (getOptions() instanceof ClusterClientOptions) {ClusterClientOptions options = (ClusterClientOptions) getOptions();ClusterTopologyRefreshOptions topologyRefreshOptions = options.getTopologyRefreshOptions();if (!topologyRefreshOptions.isPeriodicRefreshEnabled() || clusterTopologyRefreshActivated.get()) {return;}if (clusterTopologyRefreshActivated.compareAndSet(false, true)) {// 创建定时刷新任务ScheduledFuture<?> scheduledFuture = genericWorkerPool.scheduleAtFixedRate(clusterTopologyRefreshScheduler,options.getRefreshPeriod().toNanos(),options.getRefreshPeriod().toNanos(),TimeUnit.NANOSECONDS);clusterTopologyRefreshFuture.set(scheduledFuture);}} }
刷新过程:
检查周期性刷新是否启用
确保只有一个刷新任务被激活(原子操作)
创建定时任务,按配置的时间间隔执行刷新
刷新时重新获取集群拓扑信息
更新客户端内部节点路由表
5. 其他触发机制
除了重连触发外,还有多种触发条件:
java
public enum RefreshTrigger {MOVED_REDIRECT, // MOVED 重定向ASK_REDIRECT, // ASK 重定向PERSISTENT_RECONNECTS, // 持续重连UNCOVERED_SLOT, // 未覆盖的槽位UNKNOWN_NODE // 未知节点 }
对应的事件处理方法:
java
@Override public void onMovedRedirection() {if (isEnabled(RefreshTrigger.MOVED_REDIRECT)) {if (indicateTopologyRefreshSignal()) {emitAdaptiveRefreshScheduledEvent();}} }
6. 配置生效关键点
在 Spring Boot 中确保配置生效的关键:
集群模式必须启用:仅当使用集群配置时相关逻辑才会执行
正确设置 ClientOptions:通过
LettuceClientConfigurationBuilderCustomizer
注入配置拓扑刷新选项:必须包含
PERSISTENT_RECONNECTS
触发器设置合理的重连阈值:
adaptiveRefreshTriggersReconnectAttempts
7. 完整工作流程
图表
代码
最佳实践建议:
多触发器组合配置:同时启用多种触发器提高集群感知能力
java
.enableAdaptiveRefreshTrigger(RefreshTrigger.PERSISTENT_RECONNECTS,RefreshTrigger.MOVED_REDIRECT,RefreshTrigger.UNKNOWN_NODE )
合理设置重连阈值:根据网络环境调整
java
.adaptiveRefreshTriggersReconnectAttempts(3) // 生产环境建议 3-5
配置超时保护:防止拓扑刷新阻塞
java
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(10))
启用周期性刷新:作为故障恢复的补充
java
.enablePeriodicRefresh(Duration.ofMinutes(10))
通过以上机制,Lettuce 能够在集群节点故障或网络问题时自动更新拓扑信息,保持客户端与集群的同步,实现高可用连接。
##配置重连
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Configuration;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.time.Duration;@Configuration
public class RedisClusterConfig {@Beanpublic LettuceClientConfiguration lettuceClientConfiguration() {// 👇 配置集群拓扑刷新选项ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder().enablePeriodicRefresh(true) // 开启周期刷新.refreshPeriod(Duration.ofSeconds(30)) // 每30秒刷新一次.enableAllAdaptiveRefreshTriggers()
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.ASK_REDIRECT)
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT)
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.UNCOVERED_SLOT)
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.UNKNOWN_NODE)
// .enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS).adaptiveRefreshTriggersTimeout(Duration.ofSeconds(10)).refreshTriggersReconnectAttempts(5).build();return LettuceClientConfiguration.builder().clientOptions(ClusterClientOptions.builder().topologyRefreshOptions(topologyRefreshOptions).build()).build();}@Beanpublic LettuceConnectionFactory redisConnectionFactory(RedisProperties redisProperties, LettuceClientConfiguration lettuceClientConfiguration) {RedisProperties.Cluster clusterProperties = redisProperties.getCluster();RedisClusterConfiguration config = new RedisClusterConfiguration(clusterProperties.getNodes());if (clusterProperties.getMaxRedirects() != null) {config.setMaxRedirects(clusterProperties.getMaxRedirects());}if (redisProperties.getPassword() != null) {config.setPassword(RedisPassword.of(redisProperties.getPassword()));}return new LettuceConnectionFactory(config, lettuceClientConfiguration);}// 可选:配置 RedisTemplate@Beanpublic RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new GenericJackson2JsonRedisSerializer());template.setHashKeySerializer(new StringRedisSerializer());template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());return template;}}
##源码
@Bean@ConditionalOnMissingBean(RedisConnectionFactory.class)LettuceConnectionFactory redisConnectionFactory(ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,ClientResources clientResources) throws UnknownHostException {LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(builderCustomizers, clientResources,getProperties().getLettuce().getPool());return createLettuceConnectionFactory(clientConfig);}private LettuceConnectionFactory createLettuceConnectionFactory(LettuceClientConfiguration clientConfiguration) {if (getSentinelConfig() != null) {return new LettuceConnectionFactory(getSentinelConfig(), clientConfiguration);}if (getClusterConfiguration() != null) {return new LettuceConnectionFactory(getClusterConfiguration(), clientConfiguration);}return new LettuceConnectionFactory(getStandaloneConfig(), clientConfiguration);}public LettuceConnectionFactory(RedisClusterConfiguration clusterConfiguration,LettuceClientConfiguration clientConfig) {this(clientConfig);Assert.notNull(clusterConfiguration, "RedisClusterConfiguration must not be null!");this.configuration = clusterConfiguration;}//createClient创建client
public void afterPropertiesSet() {this.client = createClient();this.connectionProvider = createConnectionProvider(client, CODEC);this.reactiveConnectionProvider = createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC);if (isClusterAware()) {this.clusterCommandExecutor = new ClusterCommandExecutor(new LettuceClusterTopologyProvider((RedisClusterClient) client),new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),EXCEPTION_TRANSLATION);}if (getEagerInitialization() && getShareNativeConnection()) {initConnection();}}//clusterClient设置configuration配置 clusterClient.setOptions(getClusterClientOptions(configuration));
protected AbstractRedisClient createClient() {if (isStaticMasterReplicaAware()) {RedisClient redisClient = clientConfiguration.getClientResources() //.map(RedisClient::create) //.orElseGet(RedisClient::create);clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);return redisClient;}if (isRedisSentinelAware()) {RedisURI redisURI = getSentinelRedisURI();RedisClient redisClient = clientConfiguration.getClientResources() //.map(clientResources -> RedisClient.create(clientResources, redisURI)) //.orElseGet(() -> RedisClient.create(redisURI));clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);return redisClient;}if (isClusterAware()) {List<RedisURI> initialUris = new ArrayList<>();ClusterConfiguration configuration = (ClusterConfiguration) this.configuration;for (RedisNode node : configuration.getClusterNodes()) {initialUris.add(createRedisURIAndApplySettings(node.getHost(), node.getPort()));}RedisClusterClient clusterClient = clientConfiguration.getClientResources() //.map(clientResources -> RedisClusterClient.create(clientResources, initialUris)) //.orElseGet(() -> RedisClusterClient.create(initialUris));clusterClient.setOptions(getClusterClientOptions(configuration));return clusterClient;}RedisURI uri = isDomainSocketAware()? createRedisSocketURIAndApplySettings(((DomainSocketConfiguration) configuration).getSocket()): createRedisURIAndApplySettings(getHostName(), getPort());RedisClient redisClient = clientConfiguration.getClientResources() //.map(clientResources -> RedisClient.create(clientResources, uri)) //.orElseGet(() -> RedisClient.create(uri));clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);return redisClient;}private ClusterTopologyRefreshOptions getClusterTopologyRefreshOptions() {ClusterClientOptions clusterClientOptions = redisClusterClient.getClusterClientOptions();if (clusterClientOptions != null) {return clusterClientOptions.getTopologyRefreshOptions();}return FALLBACK_OPTIONS;}public ClusterTopologyRefreshOptions getTopologyRefreshOptions() {return topologyRefreshOptions;}private boolean isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger refreshTrigger) {return getClusterTopologyRefreshOptions().getAdaptiveRefreshTriggers().contains(refreshTrigger);}@Overridepublic void onReconnectAttempt(int attempt) {if (isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS)&& attempt >= getClusterTopologyRefreshOptions().getRefreshTriggersReconnectAttempts()) {if (indicateTopologyRefreshSignal()) {emitAdaptiveRefreshScheduledEvent();}}}@Overridepublic void onMovedRedirection() {if (isEnabled(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT)) {if (indicateTopologyRefreshSignal()) {emitAdaptiveRefreshScheduledEvent();}}}private void activateTopologyRefreshIfNeeded() {if (getOptions() instanceof ClusterClientOptions) {ClusterClientOptions options = (ClusterClientOptions) getOptions();ClusterTopologyRefreshOptions topologyRefreshOptions = options.getTopologyRefreshOptions();if (!topologyRefreshOptions.isPeriodicRefreshEnabled() || clusterTopologyRefreshActivated.get()) {return;}if (clusterTopologyRefreshActivated.compareAndSet(false, true)) {ScheduledFuture<?> scheduledFuture = genericWorkerPool.scheduleAtFixedRate(clusterTopologyRefreshScheduler,options.getRefreshPeriod().toNanos(), options.getRefreshPeriod().toNanos(), TimeUnit.NANOSECONDS);clusterTopologyRefreshFuture.set(scheduledFuture);}}}