四, Zookeeper 分布式锁机制和代码实现
4.1 Zookeeper 分布式锁机制
- 分布式锁主要用于在分布式环境中保护跨进程、跨主机、跨网络的共享资源实现互斥访问,以达到保证数据的一致性。
[为什么Zookeeper可以实现分布式锁?]
 
[分布式锁实现原理]

对实现过程的解读:
- zookeeper会在它的集群内维护一个永久根节点, 我们可以命名为
/locks , 这个根节点的每个子节点维护着每台客户端向zookeeper申请的锁; - 代表这个锁的节点,是一个临时节点并且带有序号, 客户端每次申请锁都要先创建一个这样的节点;
- 当客户端A向zookeeper申请锁时, 先在/locks下面创建一个临时带序号节点, 然后判断这个节点是否是/locks子节点集合(这个集合必须先排序)中的第一个节点, 如果是的话就直接得到锁.
- 当客户端B向zookeeper申请锁时, 也要在/locks下面创建一个临时带序号节点, 然后判断判断这个节点是否是/locks子节点集合(这个集合必须先排序)中的第一个节点, 如果不是, 就获取到这个节点的前一个节点(子节点集合中这个节点的前一个节点), 对这个前一个节点设置监听器, 等监听到这个前一个节点被删除了, 那么代表客户端B的这个节点就可以获得锁了.
- 再多的客户端都是如此判断.
其实如果有客户端C、客户端D等N个客户端争抢一个zk分布式锁,原理都是类似的。
- 大家都是上来直接创建一个锁节点下的子节点
- 如果自己不是第一个节点,就对自己上一个节点加监听器只要上一个节点释放锁(节点被删除了),自己就排到前面去了,相当于是一个排队机制。
- 而且用临时顺序节点的另外一个用意就是,如果某个客户端创建临时顺序节点之后,不小心自己宕机了也没关系,zk感知到那个客户端宕机,会自动删除对应的临时顺序节点,相当于自动释放锁,或者是自动取消自己的排队。
详细介绍:ZooKeeper分布式锁的实现原理
4.2 使用Java API 原生实现Zookeeper 分布式锁
不跟你多bb, 直接上代码
- 对zookeeper连接(包括连接, 以及对连接事件, 上一个节点删除事件的监听), 加锁, 解锁的实现
- DistributeLock.java
package cn.qsc.zkcase2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributeLock {
private final String connectString = "bigdata01:2181,bigdata02:2181,bigdata03:2181";
private final int sessiontimeout = 2000;
private final ZooKeeper zk;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String preNodePath;
private String currentNode;
private String rootNode = "locks";
private String subNode = "seq-";
public DistributeLock() throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(connectString, sessiontimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(preNodePath))
{
waitLatch.countDown();
}
}
});
connectLatch.await();
Stat stat = zk.exists("/"+rootNode, false);
if( stat == null){
zk.create("/"+rootNode, rootNode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
public void zkLock(){
try {
currentNode = zk.create("/locks"+"/seqp-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("创建了当前节点currentNode: "+ currentNode);
Thread.sleep(10);
List<String> children = zk.getChildren("/"+rootNode, false);
System.out.println("创建后我就获取了子节点"+children);
if(children.size() == 1){
return;
}else{
Collections.sort(children);
System.out.println("当前所有节点为:"+children);
String currentNodeName = currentNode.substring(("/"+rootNode+"/").length());
int index = children.indexOf(currentNodeName);
System.out.println("当前节点:"+ currentNode +", 当前节点的名称: "+currentNodeName+", " +
" 当前节点的index为:" + index);
if(index == -1 ){
System.out.println("数据有误");
}else if (index == 0){
return;
}else{
preNodePath = ("/"+rootNode+"/")+children.get(index -1);
System.out.println("======前一个节点的地址: "+preNodePath);
System.out.println("currentNode为: "+ currentNode + "preNodePath="+preNodePath);
zk.getData(preNodePath, true, new Stat());
waitLatch.await();
return;
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void unzkLock(){
try {
zk.delete(currentNode, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
- 两个线程对应于两个分布式锁对象, 去分别开启线程获得锁, 释放锁
- DistributeLockTest.java
package cn.qsc.zkcase2;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
public class DistributeLockTest {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributeLock lock1 = new DistributeLock();
DistributeLock lock2 = new DistributeLock();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.zkLock();
System.out.println("线程1获取到锁");
Thread.sleep(5000);
lock1.unzkLock();
System.out.println("线程1释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.zkLock();
System.out.println("线程2获取到锁");
Thread.sleep(5000);
lock2.unzkLock();
System.out.println("线程2释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
4.3 使用Curator框架实现Zookeeper分布式锁(待总结)
参见此文
|