高谈风月

不共青山一笑,不与黄花一醉

0%

手写zookeeper分布式锁

数据结构

在了解锁原理之前我们先来看一下ZK的数据结构,具体如下:

在 Zookeeper 中,每一个数据节点都是一个 ZNode,上图根目录下有两个节点,分别是:app1 和 app2,其中 app1 下面又有三个子节点。那么我们来看看 ZNode 数据结构到底是什么样子的呢。首先我们来了解 ZNode 的类型。

Zookeeper 节点类型可以分为三大类:持久性节点(Persistent)、瞬时性节点(Ephemeral)、顺序性节点(Sequential)。现实开发中在创建节点的时候通过组合可以生成以下四种节点类型:持久节点、持久顺序节点、瞬时节点、瞬时有序节点。

(1) 持久节点:节点被创建后会一直存在服务器,直到删除操作主动清除,这种节点也是最常见的类型。

(2) 持久顺序节点:有顺序的持久节点,节点特性和持久节点是一样的,只是额外特性表现在顺序上。顺序特性实质是在创建节点的时候,会在节点名后面加上一个数字后缀,来表示其顺序。

(3) 瞬时节点:会被自动清理掉的节点,它的生命周期和客户端会话绑在一起,客户端会话结束,节点会被删除掉。与持久性节点不同的是,临时节点不能创建子节点。

(4)瞬时有顺序节点:有顺序的临时节点,和持久顺序节点相同,在其创建的时候会在名字后面加上数字后缀。

那么此次我们的ZK分布式锁就是基于ZK的临时有序节点实现的,也就是上述的第四种节点。当然光凭借第四种临时有序节点是不够的,我们还需要用到ZK的另外一个比较重要的概念,那就是“ZK观察器”。

ZK观察器

ZK观察器可以监测到节点的变动,如果节点发生变更会通知到客户端。我们可以设置观察器的三个方法:getData(),getChildrean(),exists()。观察器有一个比较重要的特性就是只能监控一次,再监控需要重新设置。

原理流程

(1)利用ZK的瞬时有序节点的特性。

(2)多线程并发创建瞬时节点时,得到有序的序列。

(3)序号最小的线程获得锁。

(4)其他的线程则监听自己节点序号的前一个序号。

(5)前一个线程执行完成,删除自己序号的节点。

(6)下一个序号的线程得到通知,继续执行。

(7)依次类推

通过上述流程大家就会发现,其实在创建节点的时候,就已经确定了线程的执行顺序。大家看完这个流程可能有点模糊,咱们继续看下面的图解,老猫相信大家心里就会有一个更加清晰的认知。

流程一】我们有四个线程,分别是线程A、线程B、线程C、线程D。此时线程并发运行,这样就会在我们的ZK中创建四个临时有序节点,按照先来后到的顺序分别是1、2、3、4。此时按照我们流程描述中的第三点描述由于线程A对应的序号最小,所以A优先获取锁。

【流程二】再依次看第二个流程,此时当A获取锁之后,线程B的监听器会去监听1节点的执行情况,线程C的监听器会去监听2节点的执行情况,线程D的监听器会去监听3节点的执行情况依次类推。

【流程三】当线程A执行完毕之后会删除相关的节点1,此时会被线程B监听到,于是线程B开始执行,有线程C监听等待着线程B节点的释放,依次类推,直到这四个线程都执行完毕。

通过以上的图解,老猫觉得很多小伙伴对ZK锁的实现原理应该已经知道了,当然对ZK还是比较陌生的小伙伴也可以专门抽时间去熟悉一下ZK。接下来就和老猫一起来看一下具体的代码又是如何实现的吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class DistributeLocker implements AutoCloseable, Watcher {
private ZooKeeper zooKeeper;
private String zNode;

public DistributeLocker() throws IOException {
this.zooKeeper = new ZooKeeper("localhost:2181",60000,this);
}

public boolean getLock(String businessCode) {
try {
// 首先创建业务根节点
Stat stat = zooKeeper.exists("/"+businessCode,false);
if(stat == null) {
//表示创建一个业务根目录,此节点为持久节点,另外的由于在本地搭建的zk没有设置密码,所以采用OPEN_ACL_UNSAFE模式
zooKeeper.create("/" +businessCode,businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

//创建该目录下的有序瞬时节点,假如我们的订单业务编号是"xxx",那么第一个有序瞬时节点应该是/xxx/xxx_0000001
zNode =zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 获取 /xxx节点下的所有子节点,无需观察
List<String> childrenNodes = zooKeeper.getChildren("/"+businessCode,false);
//子节点排序
Collections.sort(childrenNodes);
//获取序号最小的子节点
String minNode = childrenNodes.get(0);
//如果创建的节点是最小序号的节点,那么就获得锁
if(zNode.endsWith(minNode)){
return true;
}
String lastNode = minNode;
for (String node : childrenNodes){
//如果临时节点不是第一个节点,那么监听前一个节点
if(zNode.endsWith(node)){
zooKeeper.exists("/"+businessCode+"/"+lastNode,true);
break;
}else {
lastNode = node;
}
}
//并发情况下wait方法让出锁,但是由于并发情景下,为了避免释放的时候错乱因此加上synchronized
synchronized (this){
wait();
}
//当被唤起
return true;
} catch (InterruptedException e) {
e.printStackTrace();
return false;
} catch (KeeperException e) {
e.printStackTrace();
return false;
}
}

@Override
public void close() throws Exception {
zooKeeper.delete(zNode,-1);
zooKeeper.close();
}

@Override
public void process(WatchedEvent event) {
//如果监听到节点被删除,那么则会通知下一个线程
if(event.getType() == Event.EventType.NodeDeleted){
synchronized (this){
notify();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Service
public class ZKLockService {
@Autowired
private DistributeLocker locker;
private String ORDER_KEY = "order_test";
public Integer createOrder() throws Exception{
log.info("进入了方法");
try {
if (locker.getLock(ORDER_KEY)) {
log.info("拿到了锁");
//此处为了手动演示并发,所以我们暂时在这里休眠
Thread.sleep(6000);
}
}catch (Exception e){
e.printStackTrace();
}finally {
locker.close();
}
log.info("方法执行完毕");
return 1;
}
}

curator客户端的使用

其实关于ZK锁的话还有可以用封装比较完善的客户端,那就是curator。这个客户端本身就已经实现了ZK的分布式锁,具体代码实现如下:

1
2
3
4
5
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Service
@Slf4j
public class CuratorLockService {
private String ORDER_KEY = "order_test";
@Autowired
private CuratorFramework client;
public Integer createOrder() throws Exception{
log.info("进入了方法");
InterProcessMutex lock = new InterProcessMutex(client, "/"+ORDER_KEY);
try {
if (lock.acquire(30, TimeUnit.SECONDS)) {
log.info("拿到了锁");
//此处为了手动演示并发,所以我们暂时在这里休眠6s
Thread.sleep(6000);
}
}catch (Exception e){
e.printStackTrace();
}finally {
try {
log.info("我释放了锁!!");
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
log.info("方法执行完毕");
return 1;
}
}

分布式锁的对比

到此,我们将分布式系统的锁的解决方案都已经和大家分享过了,最终咱们来进行一个对比,具体如下:

看了上面这个比较之后,其实在我们的实际项目中,还是推荐现成的 curator 实现方式以及redisson实现方式,因为毕竟目前来说是相当成熟的方案,不推荐由我们自己的代码去实现。

支持不要超过早餐费~