数据结构
在了解锁原理之前我们先来看一下ZK的数据结构,具体如下:
在 Zookeeper 中,每一个数据节点都是一个 ZNode,上图根目录下有两个节点,分别是:app1 和 app2,其中 app1 下面又有三个子节点。那么我们来看看 ZNode 数据结构到底是什么样子的呢。首先我们来了解 ZNode 的类型。
Zookeeper 节点类型可以分为三大类:持久性节点(Persistent)、瞬时性节点(Ephemeral)、顺序性节点(Sequential)。现实开发中在创建节点的时候通过组合可以生成以下四种节点类型:持久节点、持久顺序节点、瞬时节点、瞬时有序节点。
(1) 持久节点:节点被创建后会一直存在服务器,直到删除操作主动清除,这种节点也是最常见的类型。
(2) 持久顺序节点:有顺序的持久节点,节点特性和持久节点是一样的,只是额外特性表现在顺序上。顺序特性实质是在创建节点的时候,会在节点名后面加上一个数字后缀,来表示其顺序。
(3) 瞬时节点:会被自动清理掉的节点,它的生命周期和客户端会话绑在一起,客户端会话结束,节点会被删除掉。与持久性节点不同的是,临时节点不能创建子节点。
(4)瞬时有顺序节点:有顺序的临时节点,和持久顺序节点相同,在其创建的时候会在名字后面加上数字后缀。
那么此次我们的ZK分布式锁就是基于ZK的临时有序节点实现的,也就是上述的第四种节点。当然光凭借第四种临时有序节点是不够的,我们还需要用到ZK的另外一个比较重要的概念,那就是“ZK观察器”。
ZK观察器
ZK观察器可以监测到节点的变动,如果节点发生变更会通知到客户端。我们可以设置观察器的三个方法:getData(),getChildrean(),exists()。观察器有一个比较重要的特性就是只能监控一次,再监控需要重新设置。
原理流程
(1)利用ZK的瞬时有序节点的特性。
(2)多线程并发创建瞬时节点时,得到有序的序列。
(3)序号最小的线程获得锁。
(4)其他的线程则监听自己节点序号的前一个序号。
(5)前一个线程执行完成,删除自己序号的节点。
(6)下一个序号的线程得到通知,继续执行。
(7)依次类推
通过上述流程大家就会发现,其实在创建节点的时候,就已经确定了线程的执行顺序。大家看完这个流程可能有点模糊,咱们继续看下面的图解,老猫相信大家心里就会有一个更加清晰的认知。
流程一】我们有四个线程,分别是线程A、线程B、线程C、线程D。此时线程并发运行,这样就会在我们的ZK中创建四个临时有序节点,按照先来后到的顺序分别是1、2、3、4。此时按照我们流程描述中的第三点描述由于线程A对应的序号最小,所以A优先获取锁。
【流程二】再依次看第二个流程,此时当A获取锁之后,线程B的监听器会去监听1节点的执行情况,线程C的监听器会去监听2节点的执行情况,线程D的监听器会去监听3节点的执行情况依次类推。
【流程三】当线程A执行完毕之后会删除相关的节点1,此时会被线程B监听到,于是线程B开始执行,有线程C监听等待着线程B节点的释放,依次类推,直到这四个线程都执行完毕。
通过以上的图解,老猫觉得很多小伙伴对ZK锁的实现原理应该已经知道了,当然对ZK还是比较陌生的小伙伴也可以专门抽时间去熟悉一下ZK。接下来就和老猫一起来看一下具体的代码又是如何实现的吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| public class DistributeLocker implements AutoCloseable, Watcher { private ZooKeeper zooKeeper; private String zNode;
public DistributeLocker() throws IOException { this.zooKeeper = new ZooKeeper("localhost:2181",60000,this); }
public boolean getLock(String businessCode) { try { Stat stat = zooKeeper.exists("/"+businessCode,false); if(stat == null) { zooKeeper.create("/" +businessCode,businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }
zNode =zooKeeper.create("/" + businessCode + "/" + businessCode + "_", businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); List<String> childrenNodes = zooKeeper.getChildren("/"+businessCode,false); Collections.sort(childrenNodes); String minNode = childrenNodes.get(0); if(zNode.endsWith(minNode)){ return true; } String lastNode = minNode; for (String node : childrenNodes){ if(zNode.endsWith(node)){ zooKeeper.exists("/"+businessCode+"/"+lastNode,true); break; }else { lastNode = node; } } synchronized (this){ wait(); } return true; } catch (InterruptedException e) { e.printStackTrace(); return false; } catch (KeeperException e) { e.printStackTrace(); return false; } }
@Override public void close() throws Exception { zooKeeper.delete(zNode,-1); zooKeeper.close(); }
@Override public void process(WatchedEvent event) { if(event.getType() == Event.EventType.NodeDeleted){ synchronized (this){ notify(); } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Service public class ZKLockService { @Autowired private DistributeLocker locker; private String ORDER_KEY = "order_test"; public Integer createOrder() throws Exception{ log.info("进入了方法"); try { if (locker.getLock(ORDER_KEY)) { log.info("拿到了锁"); Thread.sleep(6000); } }catch (Exception e){ e.printStackTrace(); }finally { locker.close(); } log.info("方法执行完毕"); return 1; } }
|
curator客户端的使用
其实关于ZK锁的话还有可以用封装比较完善的客户端,那就是curator。这个客户端本身就已经实现了ZK的分布式锁,具体代码实现如下:
1 2 3 4 5
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Service @Slf4j public class CuratorLockService { private String ORDER_KEY = "order_test"; @Autowired private CuratorFramework client; public Integer createOrder() throws Exception{ log.info("进入了方法"); InterProcessMutex lock = new InterProcessMutex(client, "/"+ORDER_KEY); try { if (lock.acquire(30, TimeUnit.SECONDS)) { log.info("拿到了锁"); Thread.sleep(6000); } }catch (Exception e){ e.printStackTrace(); }finally { try { log.info("我释放了锁!!"); lock.release(); } catch (Exception e) { e.printStackTrace(); } } log.info("方法执行完毕"); return 1; } }
|
分布式锁的对比
到此,我们将分布式系统的锁的解决方案都已经和大家分享过了,最终咱们来进行一个对比,具体如下:
看了上面这个比较之后,其实在我们的实际项目中,还是推荐现成的 curator 实现方式以及redisson实现方式,因为毕竟目前来说是相当成熟的方案,不推荐由我们自己的代码去实现。