基于ZooKeeper临时顺序节点的分布式锁实现
[TOC]
1. 分布式锁
分布式锁是一种在分布式系统中协调进程或线程之间对共享资源进行访问控制的机制。在分布式系统中,多个进程或线程同时访问同一个资源时,为了保证数据的一致性和正确性,需要进行同步访问,避免多个进程或线程同时修改同一个资源造成数据的不一致。分布式锁就是为了解决这个问题而产生的。
在Java多线程中,我们了解过锁,这种锁主要用来控制一个JVM内的多个线程对本地共享资源的访问

但是在分布式系统中,可能会有运行在不同JVM上的线程来访问同一份资源,这时候本地的锁就无法解决问题了,分布式锁因而诞生。

一个最基本的分布式锁需要满足:
- 互斥 :任意一个时刻,锁只能被一个线程持有;
- 高可用 :锁服务是高可用的。并且,即使客户端的释放锁的代码逻辑出现问题,锁最终一定还是会被释放,不会影响其他线程对共享资源的访问。
- 可重入:一个节点获取了锁之后,还可以再次获取锁。
2. ZooKeeper 在Java中的基本使用
若想在Java中使用ZooKeeper,首先我们还是需要先启动ZooKeeper集群(可以有多个ZooKeeper服务端相连接,一般是奇数个)

接下来我们就可以通过Java来控制ZooKeeper操作了
在操作中有一系列参数,这里暂时不一一解释,咱们用到哪说到哪,感兴趣的小伙伴也可以自己去查一查
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat;
public class ZKOper { private static String nodeAdds = "localhost:2181,localhost:2182,localhost:2183";
public static void main(String[] args) throws Exception { ZooKeeper zk = new ZooKeeper(nodeAdds, 3000, watchedEvent -> {});
String seq = zk.create("/hnu", "loginIn".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("保存成功,序号:" + seq); byte[] data = zk.getData("/hnu",null,null); String v = new String(data); System.out.println("取得节点内的数据" + v); Stat rs = zk.setData(seq, "loginOut".getBytes(),-1); System.out.println("修改结果" + rs); zk.delete(seq, -1); System.out.println("删除成功" + seq); zk.close(); } }
|
在输出中,我们就可以观察到程序对ZooKeeper进行的一系列curd操作了
3. ZooKeeper实现分布式锁的思路
ZooKeeper实现分布式锁主要依赖它的两种特性:
Watcher 监听机制是 ZooKeeper 的客户端与服务端之间的一种通信方式。当客户端与服务端建立连接后,客户端可以注册一个 Watcher 对象,来监听某个节点的变化。当这个节点的状态发生变化时,服务端会向客户端发送一个通知,告诉客户端这个节点的状态已经发生了变化。客户端收到通知后,可以根据需要执行一些操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import org.apache.zookeeper.*;
public class ZKWatch {
private static String nodeAdds = "localhost:2181,localhost:2182,localhost:2183"; public static void main(String[] args) throws Exception { Watcher watch=new Watcher(){
public void process(WatchedEvent e) { System.out.println("WatchedEvent:"+e.getPath()+" "+e.getType()); if(e.getType()==Watcher.Event.EventType.NodeDeleted){ System.out.println(" NodeDeleted"); } if(e.getType()==Watcher.Event.EventType.NodeCreated){ System.out.println(" NodeCreated"); } if(e.getType()==Watcher.Event.EventType.NodeDataChanged){ System.out.println(" NodeDataChanged"); } } }; ZooKeeper zk = new ZooKeeper(nodeAdds, 3000, watch); zk.addWatch("/", watch, AddWatchMode.PERSISTENT_RECURSIVE); while(true) { Thread.sleep(1000); }
} }
|
如果启动了以上程序,再启动curd,该程序就会根据监听到的信息打印了
ZooKeeper 节点有四种类型:
-
持久节点(Persistent Node):创建后一直存在,直到被主动删除。即使创建节点的客户端断开连接,节点依然存在。
-
临时节点(Ephemeral Node):只在创建它的客户端与 ZooKeeper 之间的连接保持有效时存在。如果客户端与 ZooKeeper 断开连接,那么临时节点也会被删除。
-
持久顺序节点(Persistent Sequential Node):创建后一直存在,直到被主动删除。与持久节点不同的是,持久顺序节点会根据节点的创建顺序为节点分配一个唯一的编号。这个编号是由 ZooKeeper 保证唯一的。
-
临时顺序节点(Ephemeral Sequential Node):只在创建它的客户端与 ZooKeeper 之间的连接保持有效时存在。与临时节点不同的是,临时顺序节点也会根据节点的创建顺序为节点分配一个唯一的编号。这个编号是由 ZooKeeper 保证唯一的。如果客户端与 ZooKeeper 断开连接,那么临时顺序节点也会被删除。
临时节点的特征:
- session连接断开就没了
- 不能够创建子节点
- 不能有同名节点
- 监听节点删除和修改的变化
根据以上两种特性,我们就可以设计出一种分布式锁
- 当客户端请求数据时,会在ZooKeeper中建立一个临时顺序节点,这个节点有一个自己的序号
- 判断当前序号是否是最小的,如果是则获得分布式锁,可以执行操作
- 如果序号不是最小的,监听序号为 当前序号-1 的节点,而后线程进入wait状态,当序号为当前序号-1的节点被删除,则notify,可以开始操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
| import org.apache.zookeeper.*;
import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Random; import java.util.function.DoubleToIntFunction;
public class ZKShareLock extends Thread{ private Object innerLock = new Object(); private ZooKeeper zconn;
private String basePath="/ShareLocksTest";
private String userPath=basePath+"/User-"; private String cName;
public ZKShareLock(ZooKeeper zconn, String cName) { this.zconn = zconn; this.cName = cName; }
private void todoSome() throws Exception{ Random ran = new Random(); int t = ran.nextInt(3000)+2000; Thread.sleep(t); } private void unLock(String myPath) { try { System.out.println(cName + "释放锁" + myPath); zconn.delete(myPath, -1); zconn.close(); } catch (Exception e) { e.printStackTrace(); } } public void run(){ try { String mySeq = zconn.create(userPath, cName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(cName + " 1-创建顺序临时节点成功:" + mySeq); List<String> paths=zconn.getChildren(basePath, null); List<String> seqs = new ArrayList<>(); for(String s:paths) { s = s.substring(s.length()-10,s.length()); seqs.add(s); } Collections.sort(seqs); String myss = mySeq.substring(mySeq.length()-10,mySeq.length()); int index = seqs.indexOf(myss); System.out.println(cName + " 2-自己节点所处的排序位置:" + index); if (index==0){ System.out.println(cName + " 3-1 自己节点最小,获得锁"); todoSome(); unLock(userPath+myss); System.out.println(cName + " 3-2 执行任务完毕,释放锁"); zconn.close(); return; } else { String second = seqs.get(index - 1); String secondPath = userPath + second; System.out.println(cName + " 3- 排位后靠,要等待在路径:" + secondPath); addWatcher(secondPath); try { System.out.println(cName + " 3-3 在innerLock上锁定,等待通知事件:"); synchronized (innerLock) { innerLock.wait(); } } catch (Exception e) { e.printStackTrace(); } System.out.println(cName + " 3-4 通知事件解除,获得锁"); todoSome(); unLock(userPath + myss); zconn.close(); return; } }catch (Exception e){ e.printStackTrace(); } }
private void addWatcher(String secondPath) throws Exception{ Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDeleted) { System.out.println("NodeDeleted"); try { synchronized (innerLock) { innerLock.notify(); } System.out.println(cName+"3-2 监听到节点:"+secondPath+"被删除,发出通知"); } catch (Exception e) { e.printStackTrace(); } } } }; zconn.addWatch(secondPath, watcher,AddWatchMode.PERSISTENT); System.out.println(cName+" 3-1 在此路径上加上监听器:"+secondPath); }
public static void main(String[] args) { String nodeAdds = "localhost:2181,localhost:2182,localhost:2183"; for (int i = 0;i<10;i++){ try { ZooKeeper zconn = new ZooKeeper(nodeAdds, 3000, watchedEvent -> {}); ZKShareLock m1 = new ZKShareLock(zconn,"用户"+i); m1.start(); } catch (IOException e) { e.printStackTrace(); } } } }
|
代码中仍可能存在的问题:
如果由于网络原因,某个zookeeper客户端提前断开了连接,会导致后续的节点检测到它已被删除,而可能提前解锁进入操作,不过在本地测试环境不太容易出现这种问题。若想在本代码中解决问题,在每次唤醒后再次判断自己是否为当前最小节点也可(有点麻烦)。
基于临时顺序节点实现分布式锁的大体是这样的,可能在代码实现上会还有一些差异。
4. 两种基于ZooKeeper分布式锁的优缺点
ZooKeeper提供了两种实现分布式锁的方式:基于临时顺序节点和基于临时节点的锁。我们前面所说的都是基于临时顺序节点的分布式锁,还有一种是基于临时节点的分布式锁。
- 基于临时顺序节点的锁
基于临时顺序节点的锁实现原理是,在ZooKeeper中创建一个临时顺序节点,每个客户端都可以创建这样的节点。当客户端需要获取锁时,它会在指定的ZooKeeper目录下创建一个临时顺序节点,并且获取当前目录下所有的节点列表。然后,它将节点列表排序,并检查自己创建的节点是否是当前节点列表中的第一个节点。如果是,那么它获得了锁,如果不是,那么它会监视比自己小的节点的删除事件,直到这些节点被删除,然后再次尝试获取锁。
优点:
- 能够避免羊群效应(herd effect),即多个客户端同时争抢锁的情况,因为每个客户端都会按顺序获取锁。
- 可以保证客户端获取锁的顺序和创建节点的顺序一致,因此可以从一定程度上避免死锁的情况。
缺点:
- 需要频繁创建和删除临时节点,当锁的获取和释放频繁发生时,会对ZooKeeper的性能产生一定影响。
- 基于临时节点的锁
基于临时节点的锁实现原理是,在ZooKeeper中创建一个临时节点,这个节点就是锁的代表。当客户端需要获取锁时,它会在指定的ZooKeeper目录下创建一个临时节点。如果它成功创建了这个节点,那么它就获得了锁,否则它需要等待。当客户端释放锁时,它会删除自己创建的临时节点。
优点:
- 由于每个客户端只创建一个节点,因此可以减少ZooKeeper的负载,对性能的影响比较小。
- 可以避免由于客户端崩溃或网络故障导致锁无法释放的问题。
缺点:
- 存在羊群效应,当多个客户端同时争抢锁的情况时,可能会导致大量的客户端同时请求ZooKeeper,影响性能。
- 无法保证客户端获取锁的顺序,因此可能会出现死锁的情况。
如果应用场景中锁的获取和释放频率较低,且需要保证锁的可靠性,那么基于临时节点的锁更为合适;如果应用场景中锁的获取和释放频率较高,且需要保证客户端获取锁的顺序和避免死锁的情况,那么基于临时顺序节点的锁更为合适。
本文参考:
JavaGuide、CSDN、ChatGPT