缓存与分布式锁
即时性、数据一致要求不高的
访问量大且更新频率不高的数据
(读多,写少)
常用缓存中间件 redis
Spring
如果用spring的情况下,由于redis没有受spring的管理,
则我们需要自己先写一个redis的配置类,写好方法把redistemplate 作为bean return出来受 spring管理,
(很老的方法了,但现在企业也在用)或者 直接写xml进行注册,然后放入web.xml中
<context-param><param-name>contextConfigLocation</param-name><param-value>/WEB-INF/classes/applicationContext.xml,/WEB-INF/classes/config/spring/xjtec_helps/spring_xjtec_helps_taskhelp_starts.xml,/WEB-INF/classes/spring-session.xml</param-value></context-param>
Springboot 或cloud
引入maven redis的starter,然后spring把基础工作都给你写好了,你只需要引入,按照规则前缀,在配置中写好配置值,想用的时候 @autowired进来用就完事了
配置类
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//package org.springframework.boot.autoconfigure.data.redis;import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;@Configuration(proxyBeanMethods = false
)
@ConditionalOnClass({RedisOperations.class})
//所有的配置信息通过RedisProperties.class得到,这里引入过来
@EnableConfigurationProperties({RedisProperties.class})
@Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class})
public class RedisAutoConfiguration {public RedisAutoConfiguration() {}@Bean@ConditionalOnMissingBean(name = {"redisTemplate"})@ConditionalOnSingleCandidate(RedisConnectionFactory.class)public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {RedisTemplate<Object, Object> template = new RedisTemplate();template.setConnectionFactory(redisConnectionFactory);return template;}@Bean@ConditionalOnMissingBean@ConditionalOnSingleCandidate(RedisConnectionFactory.class)public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {return new StringRedisTemplate(redisConnectionFactory);}
}//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//package org.springframework.boot.autoconfigure.data.redis;import java.time.Duration;
import java.util.List;
import org.springframework.boot.context.properties.ConfigurationProperties;@ConfigurationProperties(
//配置文件中,按照这个前缀写就可以了prefix = "spring.redis"
)
public class RedisProperties {private int database = 0;private String url;private String host = "localhost";private String password;private int port = 6379;private boolean ssl;private Duration timeout;private String clientName;private Sentinel sentinel;private Cluster cluster;private final Jedis jedis = new Jedis();private final Lettuce lettuce = new Lettuce();public static class Pool {private int maxIdle = 8;private int minIdle = 0;private int maxActive = 8;private Duration maxWait = Duration.ofMillis(-1L);private Duration timeBetweenEvictionRuns;public Pool() {}public int getMaxIdle() {return this.maxIdle;}public void setMaxIdle(int maxIdle) {this.maxIdle = maxIdle;}public int getMinIdle() {return this.minIdle;}public void setMinIdle(int minIdle) {this.minIdle = minIdle;}public int getMaxActive() {return this.maxActive;}public void setMaxActive(int maxActive) {this.maxActive = maxActive;}public Duration getMaxWait() {return this.maxWait;}public void setMaxWait(Duration maxWait) {this.maxWait = maxWait;}public Duration getTimeBetweenEvictionRuns() {return this.timeBetweenEvictionRuns;}public void setTimeBetweenEvictionRuns(Duration timeBetweenEvictionRuns) {this.timeBetweenEvictionRuns = timeBetweenEvictionRuns;}}
}
则会自动引入
三级目录的展示,通过缓存一点一点优化
最基本的缓存的加入
缓存中有则直接用,无则进入数据库查询并存放
@Overridepublic Map<String, List<Catalog2VO>> getCatalogJson() {//给缓存中放json字符串,拿出的json字符串,还用逆转为能用的对象类型 【序列化与反序列化】//1.加入缓存逻辑String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");if(StringUtils.isEmpty(catalogJSON)){//2.缓存中没有,查询数据库Map<String, List<Catalog2VO>> catalogJsonFromDb = getCatalogJsonFromDb();//3.查到的数据再放入缓存,将对象转为json放在缓存中String s = JSON.toJSONString(catalogJsonFromDb);redisTemplate.opsForValue().set("catalogJSON",s);}//转为我们指定的对象Map<String,List<Catalog2VO>> result = JSON.parseObject(catalogJSON,new TypeReference<Map<String,List<Catalog2VO>>>(){});return result;}
为了解决击穿的问题,优化代码
缓存中所遇到的三种问题
这块代码只能在单项目(this)中能控制到1人进到数据库查找值,分布式时则控制不住(分了几台服务器就有几个人能进),但也比所有有人进入到数据库查找好
锁中的代码,一定要由三部分,
1.查缓存,又则返回
2.查数据库
3.再入缓存
//首先尝试缓存拿值@Overridepublic Map<String, List<Catalog2VO>> getCatalogJson() {//给缓存中放json字符串,拿出的json字符串,还用逆转为能用的对象类型 【序列化与反序列化】//1.加入缓存逻辑String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");if(StringUtils.isEmpty(catalogJSON)){//2.缓存中没有,查询数据库Map<String, List<Catalog2VO>> catalogJsonFromDb = getCatalogJsonFromDb();return catalogJsonFromDb;}//转为我们指定的对象Map<String,List<Catalog2VO>> result = JSON.parseObject(catalogJSON,new TypeReference<Map<String,List<Catalog2VO>>>(){});return result;}//从数据库查询并封装分类数据public Map<String, List<Catalog2VO>> getCatalogJsonFromDb() {//1.synchronized(this): SpringBoot所有的组件在容器中都是单列的,则此时this可以进行锁住,但分布式时无法锁住synchronized (this){//得到锁以后,我们应该再去缓存中查找一次,以防有人已经查到了String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");if(!StringUtils.isEmpty(catalogJSON)){//进锁后,有可能别人已经提前进来并且找到了,则直接缓存拿到返回即可Map<String,List<Catalog2VO>> result = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catalog2VO>>>(){});return result;}//1.将数据库的多次查询变为一次List<CategoryEntity> selectList = baseMapper.selectList(null);List<CategoryEntity> level1Categorys = getParent_cid(selectList,0L);Map<String,List<Catalog2VO>> parent_cid = level1Categorys.stream().collect(Collectors.toMap(k->k.getCatId().toString(),v->{List<CategoryEntity> categoryEntities = getParent_cid(selectList,v.getCatId());List<Catalog2VO> catalog2VOS = null;if(categoryEntities!=null){catalog2VOS = categoryEntities.stream().map(l2->{Catalog2VO catalog2VO = new Catalog2VO(v.getCatId().toString(),null,l2.getCatId().toString(),l2.getName());//1、找当前二级分类的三级分类封装成voList<CategoryEntity> level3Catelog = getParent_cid(selectList, l2.getCatId());if(level3Catelog!=null){List<Catalog2VO.Catalog3Vo> collect = level3Catelog.stream().map(l3->{Catalog2VO.Catalog3Vo catalog3Vo = new Catalog2VO.Catalog3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName());return catalog3Vo;}).collect(Collectors.toList());catalog2VO.setCatalog3List(collect);}return catalog2VO;}).collect(Collectors.toList());}return catalog2VOS;}));//3.查到的数据再放入缓存,将对象转为json放在缓存中String s = JSON.toJSONString(parent_cid);redisTemplate.opsForValue().set("catalogJSON",s,1, TimeUnit.DAYS);return parent_cid;}}
本地尝试分布式
赋值多台服务器但注意端口分开
然后nginx访问过来时,由网关帮我们赋值均衡
根据主机名进行断言拦截到请求,然后转到uri
穿透
访问的数据,数据库不存在,则也无法放入缓存中,则不断访问数据库
设置null值返回
雪崩
key大批量同时过期
设置过期时间时加入随机值
击穿
热点key,过期后,同时又大量的值进行访问到数据库了
给热点key加锁,不能让所有人都去数据库进行查询
分布式锁
小提示
docker中引进的redis,通过命令 docker exec -it redis redis-cli
进入到redis的操作界面
同过redis的setnx进行一次简易的分布式锁
public Map<String, List<Catalog2VO>> getCatalogJsonFromDbWithRedisLock() {//1、占分布式锁。去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111");//这里的操作就是redis的setnx操作if(lock){//加锁成功..执行业务Map<String, List<Catalog2VO>> dataFromDb = getDataFromDb();redisTemplate.delete("lock");//操作完成后,释放锁return dataFromDb;}else{//加锁失败。。则重试return getCatalogJsonFromDbWithRedisLock();//自旋的方式,不断重新尝试}}
简易锁的缺点
解锁之前,出异常了,则会导致死锁
补救:加锁时一定要带上过期时间
eg. set lock 111 EX 300 NX(redis命令)
加锁和过期时间的设置要进行原子操作
public Map<String, List<Catalog2VO>> getCatalogJsonFromDbWithRedisLock() {//1、占分布式锁。去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111",300, TimeUnit.SECONDS);//这里的操作就是redis的setnx操作if(lock){//加锁成功..执行业务Map<String, List<Catalog2VO>> dataFromDb = getDataFromDb();redisTemplate.delete("lock");//操作完成后,释放锁return dataFromDb;}else{//加锁失败。。则重试return getCatalogJsonFromDbWithRedisLock();//自旋的方式,不断重新尝试}}
删锁时也要进行原子操作,并且要避免不要删除掉别人的加的锁,所有锁的值要是唯一的,删除要进行原子操作时,运行lua脚本
public Map<String, List<Catalog2VO>> getCatalogJsonFromDbWithRedisLock() {String UUID = java.util.UUID.randomUUID().toString();//1、占分布式锁。去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",UUID,300, TimeUnit.SECONDS);//这里的操作就是redis的setnx操作if(lock){//加锁成功..执行业务Map<String, List<Catalog2VO>> dataFromDb = getDataFromDb();String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";redisTemplate.execute(new DefaultRedisScript<Integer>(script,Integer.class),Arrays.asList("lock"), UUID);return dataFromDb;}else{//加锁失败。。则重试return getCatalogJsonFromDbWithRedisLock();//自旋的方式,不断重新尝试}}
最终版本
public Map<String, List<Catalog2VO>> getCatalogJsonFromDbWithRedisLock() {String UUID = java.util.UUID.randomUUID().toString();//1、占分布式锁。去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",UUID,300, TimeUnit.SECONDS);//这里的操作就是redis的setnx操作Map<String, List<Catalog2VO>> dataFromDb = new HashMap<>();if(lock){try{//加锁成功..执行业务dataFromDb = getDataFromDb();}finally {String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";redisTemplate.execute(new DefaultRedisScript<Integer>(script,Integer.class),Arrays.asList("lock"), UUID);}return dataFromDb;}else{//加锁失败。。则重试return getCatalogJsonFromDbWithRedisLock();//自旋的方式,不断重新尝试}}
分布式锁框架 redisson
引用springboot的redissonstarter则基本不用自己再写代码,只需配置文件写好响应的属性即可
首次使用则我们自己进行配置一下,则只单独引入redisson的配置
写好配置类(放入到ioc容器中,后面则可直接使用)
单个redis的情况,集群则写入多个ip
@Configuration()
public class MyRedissonConfig {@Bean(destroyMethod="shutdown")public RedissonClient redisson() throws IOException{//创建配置Config config = new Config();config.useSingleServer().setAddress("redis://192.168.29.103:6379");//根据conf创建使用redisson所必须要的 RedissonClient示例RedissonClient redissonClient = Redisson.create(config);return redissonClient;}
}
redisson的锁都是可重入锁
可重入锁:
可重入锁是指一个线程可以重复获取它已经持有的锁。当一个线程已经持有某个锁时,它可以再次获取该锁而不会被阻塞,这种特性称为"可重入性"(Reentrancy)。重入计数:每次获取锁计数器加1,释放时减1,计数器为0时锁才真正释放
避免死锁:允许同一线程多次获取同一把锁
公平性选择:ReentrantLock可以配置为公平锁或非公平锁
中断响应:ReentrantLock支持在等待锁时响应中断
redisson的api用法与lock的api用法相同
redisson的基础应用
@GetMapping("/hello")public String hello(){//1.获取一把锁,只要锁的名字一样,就是同一把锁RLock lock = redisson.getLock("my-lock");//2.加锁lock.lock();//阻塞式等待,默认加的锁30s,只要这边把锁放了,其他人可以获得到//看门狗,业务时间长时,自动给锁续期//业务完成,(获取不到线程id)则不再续期,不手动解锁的话,30s后自动解锁//2.1 可以自己指定过期时间,但指定时间后,不会触发开门狗,则不会自动续期,我们指定了时间后,就会去执行 固定的脚本//lock.lock(10,TimeUnit.SECONDS);//2.2lock.tryLock(100,10,TimeUnit.SECONDS); 可以设定线程等待锁的时间,超过则放弃//实际应用时,则还是需要指定时间,不需要续期,只要指定的时间长一点即可(超过业务时间)try {System.out.println("加锁成功,执行业务..."+Thread.currentThread().getId());Thread.sleep(3000);} catch (InterruptedException e) {throw new RuntimeException(e);}finally {lock.unlock();}}
读写锁(也是可重入锁,当前线程可对锁多次操作)
//通过读写锁,能保证读的数据一定是最新的, 写锁是一个排他锁,读锁是一个共享锁//写锁没释放时,其他人既不能写也不能读,没有写锁时,则读锁就像不存在一样
//正在读的过程中,写锁也一定要等待读锁释放@GetMapping("/write")@ResponseBodypublic String writeValue(){//读写锁RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");String s = "";RLock rLock = lock.writeLock();try{//1、操作数据库 则加写锁 , 读数据加读锁rLock.lock();s = UUID.randomUUID().toString();Thread.sleep(3000);stringRedisTemplate.opsForValue().set("writeValue",s);} catch (InterruptedException e) {e.printStackTrace();}finally {rLock.unlock();}return s;}@GetMapping("/read")@ResponseBodypublic String readValue(){RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");String s = "";RLock rLock = lock.readLock();rLock.lock();try{s = stringRedisTemplate.opsForValue().get("writeValue");} catch (Exception e) {e.printStackTrace();}finally {rLock.unlock();}return s;}
信号量(主要用于限流操作,可以设置线程上线)
@GetMapping("/park")@ResponseBodypublic String park() throws InterruptedException{RSemaphore park = redisson.getSemaphore("park");//park.acquire();//直接获取一个信号量,如果此时信号量没有了,则会卡住//则通常用下面的方法,得到boolean返回值boolean b = park.tryAcquire();if(b){//执行业务}else{//返回提示return "fail";}return "ok=>"+b;}@GetMapping("/go")@ResponseBodypublic String go() throws InterruptedException{RSemaphore park = redisson.getSemaphore("park");park.release();return "ok";}
闭锁
/*** 等待5个班级都没人后,才能锁大门*/@GetMapping("/lockDoor")@ResponseBodypublic String lockDoor() throws InterruptedException{RCountDownLatch door = redisson.getCountDownLatch("door");door.trySetCount(5);//设置好锁需要达到的数量door.await();//等待闭锁的完成//到达数量后则可以放行return "放假了...";}@GetMapping("/gogogo/{id}")public String gogogo(@PathVariable("id") Long id){RCountDownLatch door = redisson.getCountDownLatch("door");door.countDown();//触发一次,则减少一个return id+"班的人都走了";}
缓存一致性
/**(根据自己的业务仔细判断)* 缓存里面的数据如何和数据库保持一致* 缓存数据一致性* 1)、双写模式 操作数据库的时候 ,重新缓存保存* 2)、失效模式 操作数据库的时候,直接删除相应缓存* 但这两个操作如果不加锁的话,还是会导致脏数据* 加锁后性能很低,要加也要加读写锁* 总结:* 我们能放入缓存的数据本就不应该是实时性,一致性要求超高的数据,所以一般设定好过期时间都是能正常使用的* 我们不应该过度设计,增加系统的复杂性* 遇到频繁修改、实时要求高,那就不要缓存、不加锁、自己查数据库就好了* 阿里有一个cannal* @return*/
SpringCache
不同的CacheManager 都对缓存进行了进一次的分区,例如对于redis的CacheManager,虽然缓存都是存在redis得同一个库,但到这边还是进行了分区处理
整合SpringCache简化缓存开发
- 引入依赖
spring-boot-starter-cache
spring-boot-starter-data-redis //我们用redis进行的缓存
2)写配置
(1)自动配置了那些
CacheAutoConfiguration会导入 RedisCacheConfiguration;
自动配置好了缓存管理器RedisCacheManager
(2)配置使用了redis作为缓存
spring.cache.type = redis
3)测试使用缓存
@Cacheable 触发将数据保存到缓存的操作
@CacheEvict 触发将数据从缓存删除的操作
@CachePut 不影响方法的执行进行缓存的更新
@Cacheing 组合以上多个操作
@CacheConfig 在类别共享缓存的相同配置
3)测试使用缓存
主类加注解 启动缓存 @EnbaleCaching
然后进行相应的注解进行相应的缓存操作
// 每一个需要缓存的数据我们都来指定要放到那个名字的缓存【缓存的分区(按照业务类型分)】@Cacheable({"category"})//代表当前方法的结果需要缓存,如果缓存中有,则不调用此方法@Overridepublic List<CategoryEntity> getLevel1Categorys() {System.out.println("调用了getLevel1Categorys...");// 查询父id=0return baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0));}
自定义调整缓存配置
/*** 启动Cacheable注释能进行缓存的使用 是有默认值的* key自动生产名字* 缓存的值默认使用jdk序列化* 默认ttl时间 -1* * 则这三样内容我们肯定要进行自定义的* 注解中定义了 key值 通过表达式或者字符串的形式定义key值* 缓存时间则需要在配置文件中进行修改* 为了更好的迁移性 在缓存信息保存为json形式,调整格式时,就需要写类调整配置类了* @return*/// 每一个需要缓存的数据我们都来指定要放到那个名字的缓存【缓存的分区(按照业务类型分)】@Cacheable(value = {"category"},key = "'level1Categorys'")//代表当前方法的结果需要缓存,如果缓存中有,则不调用此方法@Overridepublic List<CategoryEntity> getLevel1Categorys() {System.out.println("调用了getLevel1Categorys...");// 查询父id=0return baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0));}
自定义配置类
package com.atguigu.gulimall.product.config;import org.springframework.boot.autoconfigure.cache.CacheProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;/*** 通过注解 @EnableConfigurationProperties 可以绑定配置类* 但为什么我们要通过注解的形式拿到该类(CacheProperties)和该类的值呢,因为CacheProperties并不是受spring容器管理,则通过这种形式导入*/
@EnableConfigurationProperties(CacheProperties.class)
@Configuration
//把启动注解放在我们的自己自定义缓存配置上面
@EnableCaching
//自定义缓存配置
public class MyCacheConfig {//在类CacheProperties的源码我们可以看到/*** @ConfigurationProperties(prefix = "spring.cache") 是通过这个前缀和自己的属性进行值匹配的** 然后为了让这个类生效 我们就加了顶部的 @EnableConfigurationProperties(CacheProperties.class)*///@Autowired 可以通过属性的方式得到//CacheProperties cache//这里是该redis的相关配置,要该其他的,导入其他的配置类在按这要写就可以了@Bean //放在参数上面 也是可以直接导入值的RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties){//拿到最原始的实例,然后进行修改覆盖即可RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();//设置key的序列化方式config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));//主要就是把默认jdk序列化的形式 转为jsonconfig = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));CacheProperties.Redis redis = cacheProperties.getRedis();//将配置文件中的所以配置都生效if(redis.getTimeToLive()!=null){config = config.entryTtl(redis.getTimeToLive());}if(redis.getKeyPrefix()!=null){config = config.prefixKeysWith(redis.getKeyPrefix());}if(!redis.isCacheNullValues()){config = config.disableCachingNullValues();}if(!redis.isUseKeyPrefix()){config = config.disableKeyPrefix();}return config;}}
再看下我们的配置文件中进行了那些配置
spring.cache.type=redis
spring.cache.redis.time-to-live=360000
#指定我们的缓存前缀,如果没有就用缓存的名字作为前缀
spring.cache.redis.key-prefix=CACHE_
#但需要手动开启
spring.cache.redis.use-key-prefix=false
#防止缓存击穿 是否缓存空值
spring.cache.redis.cache-null-values=true
数据更新时,缓存注释的操作
/*** 级联更新所有关联的数据* @param category*/ //注意key为固定字符串时一定要加单引号,不然会被当作动态的表达式//删除单个指定key缓存 @CacheEvict(value = "category", key = "'getLevel1Categorys'")//写的方法上加入该注解@CacheEvict 进行的失效模式,删除之前的缓存//删除多个指定key缓存 则要用到注解 @Caching 可以进行组合操作@Caching(evict = {@CacheEvict(value = "category", key = "'getLevel1Categorys'"),@CacheEvict(value = "category", key = "'getCatalogJson'")})//直接进行分区删除缓存 更为方便@CacheEvict(value = "category",allEntries = true)//如果数据返回值时 用注解CachePut 则会把数据写入缓存中@CachePut@Transactional@Overridepublic void updateCascade(CategoryEntity category) {this.updateById(category);categoryBrandRelationService.updateCategory(category.getCatId(),category.getName());}