Redisson电子文档
大小:1.57 MB已被52人关注
Redisson使用手册是一个不错的学习资源,大小为1.57 MB,由焦代双 提供,Redisson类资源中评分为9.2。
Tags:Redisson Redisson手册
Redisson是一个在Redis的基础上实现的Java驻内存数据网格。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
Redisson支持Redis 2.8以上版本,支持Java1.6+以上版本。
给大家精选了网上关于《Redisson使用手册》的学习笔记心得及相关实例内容,值得大家学习参考。
Java使用Redisson分布式锁实现原理
1. 基本用法
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.8.2</version> </dependency>
Config config = new Config(); config.useClusterServers() .setScanInterval(2000) // cluster state scan interval in milliseconds .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001") .addNodeAddress("redis://127.0.0.1:7002"); RedissonClient redisson = Redisson.create(config); RLock lock = redisson.getLock("anyLock"); lock.lock(); try { ... } finally { lock.unlock(); }
针对上面这段代码,重点看一下Redisson是如何基于Redis实现分布式锁的
Redisson中提供的加锁的方法有很多,但大致类似,此处只看lock()方法
更多请参见https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers
2. 加锁
可以看到,调用getLock()方法后实际返回一个RedissonLock对象,在RedissonLock对象的lock()方法主要调用tryAcquire()方法
由于leaseTime == -1,于是走tryLockInnerAsync()方法,这个方法才是关键
首先,看一下evalWriteAsync方法的定义
最后两个参数分别是keys和params
实际调用是这样的:
单独将调用的那一段摘出来看
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
结合上面的参数声明,我们可以知道,这里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)
假设前面获取锁时传的name是“abc”,假设调用的线程ID是Thread-1,假设成员变量UUID类型的id是6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c
那么KEYS[1]=abc,ARGV[2]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1
因此,这段脚本的意思是
1、判断有没有一个叫“abc”的key
2、如果没有,则在其下设置一个字段为“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”,值为“1”的键值对 ,并设置它的过期时间
3、如果存在,则进一步判断“6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1”是否存在,若存在,则其值加1,并重新设置过期时间
4、返回“abc”的生存时间(毫秒)
这里用的数据结构是hash,hash的结构是: key 字段1 值1 字段2 值2 。。。
用在锁这个场景下,key就表示锁的名称,也可以理解为临界资源,字段就表示当前获得锁的线程
所有竞争这把锁的线程都要判断在这个key下有没有自己线程的字段,如果没有则不能获得锁,如果有,则相当于重入,字段值加1(次数)
3. 解锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); }
我们还是假设name=abc,假设线程ID是Thread-1
同理,我们可以知道
KEYS[1]是getName(),即KEYS[1]=abc
KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{abc}
ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0
ARGV[2]是生存时间
ARGV[3]是getLockName(threadId),即ARGV[3]=6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1
因此,上面脚本的意思是:
1、判断是否存在一个叫“abc”的key
2、如果不存在,向Channel中广播一条消息,广播的内容是0,并返回1
3、如果存在,进一步判断字段6f0829ed-bfd3-4e6f-bba3-6f3d66cd176c:Thread-1是否存在
4、若字段不存在,返回空,若字段存在,则字段值减1
5、若减完以后,字段值仍大于0,则返回0
6、减完后,若字段值小于或等于0,则广播一条消息,广播内容是0,并返回1;
可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了
4. 等待
以上是正常情况下获取到锁的情况,那么当无法立即获取到锁的时候怎么办呢?
再回到前面获取锁的位置
@Override public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } // 订阅 RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); try { while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } } finally { unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); } protected static final LockPubSub PUBSUB = new LockPubSub(); protected RFuture<RedissonLockEntry> subscribe(long threadId) { return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); } protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) { PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService()); }
这里会订阅Channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源
当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞
5. 小结
6. 其它相关
《基于Redis的分布式锁的简单实现》
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持码农之家。
下一篇:360小程序设计指南
展开 +
收起 -
本书使用实际代码而非伪代码来描述算法,并以经验主导支撑数学分析,侧重于应用且规范严谨。本书提供了用多种程序设计语言实现的文档化的实际代码解决方案,还介绍了近40种核心算法,
立即下载Linux典藏大系自2010年陆续出版以来,因其内容丰富、讲解细腻、通俗易懂和实用性强等特色而深受广大读者的喜爱,长期位居同类图书销售排行榜的前列,累计销量近20万册。丛书中部分图书荣
立即下载最近由于工作很忙,很长时间没有更新博客了,今天为大家带来一篇有关Redisson实现分布式锁的文章,好了,不多说了,直接进入主题。 1. 可重入锁(Reentrant Lock) Redisson的分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口,同时还支持自动过期解锁。 public void testReentrantLock(RedissonClient redisson){ RLock lock = redisson.getLock("anyLock"); try{ // 1. 最常见的使用方法 //lock.lock(); // 2. 支持过期解锁功能,10秒钟以后自动解锁, 无需调用unlock方法手动解锁 //lock.lock(10, TimeUnit.SECONDS); // 3. 尝试加锁,最多等待3秒,上锁以后10秒自动解锁 boolean res = lock.tryLock(3, 10, TimeUnit.SECONDS); if(res){ //成功 // do your business } } catch……
郗天籁
Copyright 2018-2021 www.xz577.com 码农之家
版权投诉 / 书籍推广 / 赞助:520161757@qq.com
Redisson分布式锁源码解析
Redisson锁继承Implements Reentrant Lock,所以具备 Reentrant Lock 锁中的一些特性:超时,重试,可中断等。加上Redisson中Redis具备分布式的特性,所以非常适合用来做Java中的分布式锁。 下面我们对其加锁、解锁过程中的源码细节进行一一分析。 锁的接口定义了一下方法: 分布式锁当中加锁,我们常用的加锁接口: boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; 下面我们来看一下方法的具体实现: public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); final long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire……