秒杀商品的代码/**
* 高并发场景下,会出现数据不一致,如:超卖现象
* @return
*/
@GetMapping("/sk")
public String seckillHandler() {// 从redis中获取库存String stock = srt.opsForValue().get("sk:0008");// 将类型转化为整形int amount = stock == null ? 0 : Integer.parseInt(stock);// 如果库存量大于0,就抢购if (amount > 0) {// 修改库存后,写回redissrt.opsForValue().set("sk:0008", String.valueOf(--amount));return "库存剩余" + amount + "台";}return "抱歉,您没有抢到";
}
setnx实现方式原理setnx实现方式原理:setnx 只有在指定 key 不存在时才能执行成功,分布式系统中的哪个节点抢先成功执行了 setnx,谁就抢到了锁,谁就拥有对共享资源的操作权限。其它节点只能等待锁的释放。一旦拥有锁的节点对共享资源操作完毕,其就主动删除该 key,即释放锁。然后,其它节点就可重新使用 setnx 命令抢注该 key,即抢注锁。setnx实现在 Controller 类中添加一个 String 常量,作为 Redis 锁的 key
// 分布式锁
public static final String REDIS_LOCK = "lock";
@Autowired
StringRedisTemplate srt;@Autowired
private Redisson redisson;
实现具体的逻辑
/*** 解决超卖的情况,同步synchronized 只适用于单服务器,不适合实际环境* @return*/
@GetMapping("/sk1")
public String seckillHandler1() {synchronized (this) {String result = "抱歉,您没有抢到";// 从redis中获取库存String stock = srt.opsForValue().get("sk:0008");// 将类型转化为整形int amount = stock == null ? 0 : Integer.parseInt(stock);// 如果库存量大于0,就抢购if (amount > 0) {srt.opsForValue().set("sk:0008", String.valueOf(--amount));result = "库存剩余" + amount + "台";System.out.println(result);}return result + "[server is" + serverPort + "]";}
}
setnx实现存在的问题: expire 命令为 key 指定过期时间,还有一种是在 setnx 命令中直接给出该 key 的过期时间。第一种方式中 setnx 与 expire 命令是分别执行的,不具备原子性,仍然可能会出现问题。而第二种方式则是直接在 setnx 中完成了两步操作,具有原子性。所以,采用第二种方式。/*** 解决app节点主机,在未来得及释放锁宕机隐患——为锁设置过期时间* @return*/
@GetMapping("/sk3")
public String seckillHandler3() {String result;try {// 添加锁//Boolean ISLock = srt.opsForValue().setIfAbsent(REDIS_LOCK, "lock");//设置锁的过期时间//srt.expire(REDIS_LOCK,5, TimeUnit.SECONDS);//在添加锁的同时,为该锁指定过期时间,该操作具有原子性Boolean ISLock = srt.opsForValue().setIfAbsent(REDIS_LOCK, "lock",5,TimeUnit.SECONDS);if (!ISLock) {return "没抢到锁";}result = "抱歉,您没有抢到";// 从redis中获取库存String stock = srt.opsForValue().get("sk:0008");// 将类型转化为整形int amount = stock == null ? 0 : Integer.parseInt(stock);// 如果库存量大于0,就抢购if (amount > 0) {srt.opsForValue().set("sk:0008", String.valueOf(--amount));result = "库存剩余" + amount + "台";System.out.println(result);}} finally {// 释放锁srt.delete(REDIS_LOCK);}return result + "[server is" + serverPort + "]";
}
/*** 锁指定的过期时间为 5 秒,如果 seckillHandler3()方法的业务逻辑比较复杂,* 需要调用其它微服务处理。如果请求 a 的处理时间超过了 5 秒(假设 6 秒),而当 5 秒钟过去* 后,这个锁自动过期了。由于锁已过期,另一个请求 b 通过 setnx 申请到了锁。此时如果耗* 时 6 秒的请求 a 处理完了,回来继续执行程序,请求 a 就会把请求 b 设置的锁给删除了。此* 时其它请求就可申请到锁,并与请求 b 同时访问共享资源,很可能会引发数据的不一致。** 解决方法:锁的拥有者,才可以释放锁*/@GetMapping("/sk4")public String seckillHandler4() {//为每一个访问的客户端随机生成一个客户端唯一标识//将锁的value设置为clientIdString clientId = UUID.randomUUID().toString();String result;try {// 添加锁//Boolean ISLock = srt.opsForValue().setIfAbsent(REDIS_LOCK, "lock");//设置锁的过期时间//srt.expire(REDIS_LOCK,5, TimeUnit.SECONDS);//在添加锁的同时,为该锁指定过期时间,该操作具有原子性Boolean ISLock = srt.opsForValue().setIfAbsent(REDIS_LOCK, clientId,5,TimeUnit.SECONDS);if (!ISLock) {return "没抢到锁";}result = "抱歉,您没有抢到";// 从redis中获取库存String stock = srt.opsForValue().get("sk:0008");// 将类型转化为整形int amount = stock == null ? 0 : Integer.parseInt(stock);// 如果库存量大于0,就抢购if (amount > 0) {srt.opsForValue().set("sk:0008", String.valueOf(--amount));result = "库存剩余" + amount + "台";System.out.println(result);}} finally {if(srt.opsForValue().get(REDIS_LOCK).equals(clientId)) {// 释放锁srt.delete(REDIS_LOCK);}}return result + "[server is" + serverPort + "]";}
Lua脚本jedis.eval()jedis依赖redis.clients jedis
@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.port}")
private Integer redisPort;
/*** 在 finally{}中对于删除锁的客户端身份的判断与删除锁操作是两个语句,不具有原子性,* 在并发场景下可能会出问题。* 对客户端身份的判断与删除锁操作的合并,是没有专门的原子性命令的。此时可以通过* Lua 脚本来实现它们的原子性。*/
@GetMapping("/sk5")
public String seckillHandler5() {//为每一个访问的客户端随机生成一个客户端唯一标识//将锁的value设置为clientIdString clientId = UUID.randomUUID().toString();String result;try {// 添加锁//Boolean ISLock = srt.opsForValue().setIfAbsent(REDIS_LOCK, "lock");//设置锁的过期时间//srt.expire(REDIS_LOCK,5, TimeUnit.SECONDS);//在添加锁的同时,为该锁指定过期时间,该操作具有原子性Boolean ISLock = srt.opsForValue().setIfAbsent(REDIS_LOCK, clientId,5,TimeUnit.SECONDS);if (!ISLock) {return "没抢到锁";}result = "抱歉,您没有抢到";// 从redis中获取库存String stock = srt.opsForValue().get("sk:0008");// 将类型转化为整形int amount = stock == null ? 0 : Integer.parseInt(stock);// 如果库存量大于0,就抢购if (amount > 0) {srt.opsForValue().set("sk:0008", String.valueOf(--amount));result = "库存剩余" + amount + "台";System.out.println(result);}} finally {JedisPool jedisPool = new JedisPool(redisHost, redisPort);try(Jedis jedis=jedisPool.getResource()){//定义lua脚本,注意,每行最后都要有一个空格//redis.call()是Lua中Redis命令的调用函数String sctipt="if redis.call('get',KEYS[1]==ARGV[1])"+"then return redis.call('del',KEYS[1])"+"return 0"+"end";Object eval = jedis.eval(sctipt,Collections.singletonList(REDIS_LOCK),Collections.singletonList(clientId));if("1".equals(eval.toString())){System.out.println("释放锁成功");}else{System.out.println("释放锁时,发生异常!");}}}return result + "[server is" + serverPort + "]";
}
Redisson可重入锁Redisson依赖org.redisson redisson 3.20.0
Application:在 Application 中添加一个由单 Redis 节点构建的 Redisson 的 Bean@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.port}")
private Integer redisPort;
@Bean
public Redisson redisson() {Config config = null;config.useSingleServer().setAddress(redisHost+":"+redisPort).setDatabase(0);return (Redisson) Redisson.create(config);
}
@Autowired
private Redisson redisson;
/*** 收到锁过期时间的影响,仍然存在并发问题* 请求 a 的锁过期,但其业务还未执行完毕;请求 b 申请到* 了锁,其也正在处理业务。如果此时两个请求都同时修改了共享的库存数据,那么就又会出* 现数据不一致的问题,即仍然存在并发问题。* 对于该问题,可以采用“锁续约”方式解决。* 在当前业务进程开始执行时,fork 出一个子进程,用于启动一个定时任务。* 该定时任务的定时时间小于锁的过期时间,其会定时查看处理当前请求的业务进程的锁是否已被删除。* 如果已被删除,则子进程结束;如果未被删除,说明当前请求的业务还未处理完毕,* 则将锁的时间重新设置为“原过期时间”* @return*/
@GetMapping("/sk6")
public String seckillHandler6() {String result = null;RLock rLock = redisson.getLock(REDIS_LOCK);try {// 添加锁//指定锁的过期时间为5秒,如果申请锁失败, 则最长等待20秒。Boolean ISLock = rLock.tryLock(20,5,TimeUnit.SECONDS);if (!ISLock) {return "没抢到锁";}result = "抱歉,您没有抢到";// 从redis中获取库存String stock = srt.opsForValue().get("sk:0008");// 将类型转化为整形int amount = stock == null ? 0 : Integer.parseInt(stock);// 如果库存量大于0,就抢购if (amount > 0) {srt.opsForValue().set("sk:0008", String.valueOf(--amount));result = "库存剩余" + amount + "台";System.out.println(result);}}catch (Exception e){e.printStackTrace();}finally {//释放锁rLock.unlock();}return result + "[server is" + serverPort + "]";
}
Redisson红锁@Bean("redisson-1")public Redisson redisson1() {Config config = null;config.useSentinelServers().setMasterName("myMaster1").addSentinelAddress("redis:16380","redis:16381","redis:16382");return (Redisson) Redisson.create(config);}@Bean("redisson-2")public Redisson redisson2() {Config config = null;config.useSentinelServers().setMasterName("myMaster2").addSentinelAddress("redis:26380","redis:26381","redis:26382");return (Redisson) Redisson.create(config);}@Bean("redisson-3")public Redisson redisson3() {Config config = null;config.useSentinelServers().setMasterName("myMaster3").addSentinelAddress("redis:36380","redis:36381","redis:36382");return (Redisson) Redisson.create(config);}
@Resource(name="redisson-1")private Redisson redisson1;@Resource(name="redisson-2")private Redisson redisson2;@Resource(name="redisson-3")private Redisson redisson3;/**
* 在redis主从集群下的锁丢失问题
* 原因:因为redis本身是一个AP集群[保证可用性,牺牲一致性]
* 解决方法:使用红锁
*/
@GetMapping("/sk7")
public String seckillHandler7() {String result = null;RLock rLock1 = redisson1.getLock(REDIS_LOCK + "_1");RLock rLock2 = redisson1.getLock(REDIS_LOCK + "_2");RLock rLock3 = redisson1.getLock(REDIS_LOCK + "_3");RLock rLock = redisson.getRedLock(rLock1,rLock2,rLock3);try {// 添加锁//指定锁的过期时间为5秒,如果申请锁失败, 则最长等待20秒。Boolean ISLock = rLock.tryLock(20,5,TimeUnit.SECONDS);if (!ISLock) {return "没抢到锁";}result = "抱歉,您没有抢到";// 从redis中获取库存String stock = srt.opsForValue().get("sk:0008");// 将类型转化为整形int amount = stock == null ? 0 : Integer.parseInt(stock);// 如果库存量大于0,就抢购if (amount > 0) {srt.opsForValue().set("sk:0008", String.valueOf(--amount));result = "库存剩余" + amount + "台";System.out.println(result);}}catch (Exception e){e.printStackTrace();}finally {//释放锁rLock.unlock();}return result + "[server is" + serverPort + "]";
}

Rdisson

- Redisson 底层采用的是 Netty 框架。支持 Redis2.8 以上版本,支持 Java1.6+以上版本。
- 注意:为了避免锁到期但业务逻辑没有执行完毕而引发的多个线程同时访问共享资源的情况发生,Redisson 内部为锁提供了一个监控锁的看门狗 watch dog,其会在锁到期前不断延长锁的到期时间,直到锁被主动释放。即会自动完成“锁续命”。
ReentrantLock 是可重入锁,其是通过 AQS(抽象队列同步器)实现的锁机制synchronized 也是可重入锁,其是通过监视器模式(本质是 OS 的互斥锁)实现的锁机制@GetMapping("/test2")public String test2() {RPermitExpirableSemaphore rs = redisson.getPermitExpirableSemaphore("redis_semaphore");String permitId=null;try{//对信号量的申请(p操作)//申请一个信号,返回辨识IdpermitId=rs.acquire();//申请一个信号,若没有成功,则最多等待10秒,返回辨识IdpermitId=rs.acquire(10,TimeUnit.SECONDS);//业务逻辑。。。}catch (Exception e) {e.printStackTrace();}finally {//对信号量的释放(V操作)//释放一个信号,需要携带辨识IDrs.release(permitId);boolean releaseOk = rs.tryRelease(permitId);}return null;}
Redisson 可以获取到分布式闭锁 RCountDownLatch,其与 JDK 的 JUC 中的闭锁CountDownLatch 原理相同,用法类似。其常用于一个或者多个线程的执行必须在其它某些任务执行完毕的场景。例如,大规模分布式并行计算中,最终的合并计算必须基于很多并行计算的运行完毕。Barrier队列解决该问题,而 Barrier 队列通常使用 Zookeeper 实现。@GetMapping("/test3")
public String test3() {//获取闭锁的对象(合并线程与条件线程中都需要该代码)RCountDownLatch latch = redisson.getCountDownLatch("countDownLatch");//设置闭锁计数器初值,使用该语句的场景:// 1. Redis中没有设置该值// 2. Redis中设置了该值,但已经变为了0,需要重置latch.trySetCount(10);//在合并线程中要等待着闭锁的打开try{//阻塞合并线程,直到锁打开//latch.await();//阻塞合并线程,直到锁打开latch.await(5,TimeUnit.SECONDS);}catch (Exception e){e.printStackTrace();}//条件线程代码//使闭锁计数器减一latch.countDown();return null;
}