Redisson分布式锁
  # 1 Redisson使用示例
redission支持4种连接redis方式,分别为单机、主从、Sentinel、Cluster 集群。项目中使用的是集群模式。
@Configuration
public class RedissonConfig {
  @Autowired
  private RedisConfigBean redisConfigBean;
  @Bean
  public Redisson redisson() {
    List<String> clusterNodes = new ArrayList<>();
    for (Map<String, String> node : redisConfigBean.getNodesInfo()) {
      //redisson版本是3.5,集群的ip前面要加上“redis://”,不然会报错,3.2版本可不加
      clusterNodes.add("redis://" + node.get("ip") + ":" + node.get("port"));
    }
    Config config = new Config();
    ClusterServersConfig clusterServersConfig = config.useClusterServers();
    //添加集群节点
    clusterServersConfig.addNodeAddress(clusterNodes.toArray(new String[clusterNodes.size()]));
    if (!StringUtils.isEmpty(redisConfigBean.getPassword())) {
      //设置密码
      clusterServersConfig.setPassword(redisConfigBean.getPassword());
    }
    return (Redisson) Redisson.create(config);
  }
}
 2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
@ConfigurationProperties(
        prefix = "redis"
)
@RefreshScope
public class RedisConfigBean {
  private Integer cacheExpireTime;
  private Integer randomRange;
  private String password;
  private Integer timeoutInMillis;
  private Integer readTimeoutInMillis;
  private Integer dbIndex;
  private Integer maxTotal;
  private List<Map<String, String>> nodesInfo;
  public RedisConfigBean() {
  }
  public List<RedisNode> getNodesInfoList() {
    if (CollectionUtils.isEmpty(this.nodesInfo)) {
      throw new RedisException("redis nodes is empty");
    } else {
      List<RedisNode> list = new ArrayList();
      this.nodesInfo.forEach((map) -> {
        list.add(new RedisNode((String) map.get("ip"), Integer.parseInt((String) map.get("port"))));
      });
      return list;
    }
  }
}
 2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
redis:
  maxTotal: 2048
  ####### 使用RedisUtil里的setEx、putListCacheWithExpireTime、expire方法时,随机增加最大值为cacheExpireTime过期时间(秒)
  randomRange: 10
  #######  redis过期时间(秒)
  cacheExpireTime: 7200
  ####### JedisClient连接超时时间(毫秒)
  timeoutInMillis: 3000
  ####### JedisClient读取超时时间(毫秒)
  readTimeoutInMillis: 2500
  password: 89OBm#i9
  nodesInfo:
    - ip: 10.30.9.111
      port: 6379
 2
3
4
5
6
7
8
9
10
11
12
13
14
# 2 分布式锁演变过程
# 2.1 SETNX
 存在问题:
1)客户端所在节点奔溃,无法正确释放锁。
2)业务逻辑异常,无法释放锁。
# 2.2 超时设置
设置超时时间,到点锁自动释放。
SETNX lock:168 1 // 获取锁(integer) 1>
EXPIRE lock:168 60 // 60s 自动删除(integer) 1
存在问题:
1)「加锁」、「设置超时」是两个命令,不是原子操作。可能出现执行了第一条命令,第二条执行失败的情况。
解决方案:
Redis 2.6.x之后,官方拓展了SET命令的参数,支持设置超时时间,并且满足原子性。
set key_name random_value nx px 30000
nx 表示只有key_name不存在才能设值成功。
px 30000 表示30秒后自动过期。
# 2.3 只能释放自己的锁
存在问题:
自己的锁可能被别人释放。
比如:
1.线程1获取锁成功并设置30秒后超时。
2.线程1由于某些原因执行很慢(网络问题、fullGC问题等...),超过30秒还没执行完,此时Redis因为锁过期自动释放了锁。
3.线程2获取锁执行自己业务。
4.线程1执行完自己业务释放锁,结果此时释放成线程2的锁。
解决方案:
加锁的时候设置一个「唯一标识」作为value,释放锁的时候用自己的唯一标识和value作比较,匹配上才能释放锁。
加锁:
set key_name random_value nx px 30000
释放锁:
if (redis.get("key_name").equals(random_value)) {
//比对成功则删除
redis.del("key_name");
}
**问题:**释放锁时这种写法存在一个问题,get和del是两个操作,存在原子性问题。
可以通过Lua脚本实现原子性:
// 获取锁的 value 与 ARGV[1] 是否匹配,匹配则执行
delif redis.call("get",KEYS[1]) == ARGV[1]
then return redis.call("del",KEYS[1])
else return 0
end
# 2.4 正确设置锁超时
超时时间的设置一般为:通过多轮压测,取平均时间的3 ~ 5倍。
但即使这样仍然可能出现问题,可以通过以下方式完善超时时间设置:
给获取锁的线程添加一个守护线程,该守护线程定期检测锁的失效时间,如果锁快要失效,但是业务还没执行完,就对这个锁进行续期,重新设置超时时间。
# 2.5 实现可重入锁
 通过redis hash结构实现可重入锁。
加锁:
1.加锁时先使用redis exists判断key_name这个锁是否存在。
2.如果锁不存在,使用hincrby创建一个key_name的hash表,random_value对应的value_count初始化为0再加1。
3.如果key_name存在,用hexists判断random_value这个键存不存在,如果random_value存在,value_count使用hincrby加1,否则加锁失败。
解锁:
1.不存在key_name或不存在random_value,解锁失败。
2.存在指定random_value,则使用hincrby减1,当value_count小于等于0,使用del删除这把锁。释放锁成功。
# 3 Redis分布式锁存在什么缺点?
由于redis集群同步数据的方式是异步,假设master节点获取到锁之后未完成数据同步就挂了,这个时候在新的master节点依然可以获取锁,所以多个客户端会同时获取到锁。