Categories
程式開發

拜託,別再問我Zookeeper如何實現分佈式鎖了!


導讀

真是有人(鎖)的地方就有江湖(事務),今天不談江湖,來撩撩人。分佈式鎖的概念、為什麼使用分佈式鎖,想必大家已經很清楚了。前段時間作者寫過Redis是如何實現分佈式鎖,今天這篇文章來談談Zookeeper是如何實現分佈式鎖的。陳某今天分別從如下幾個方面來詳細講講ZK如何實現分佈式鎖:ZK的四種節點排它鎖的實現讀寫鎖的實現Curator實現分步式鎖

ZK的四種節點

持久性節點:節點創建後將會一直存在臨時節點:臨時節點的生命週期和當前會話綁定,一旦當前會話斷開臨時節點也會刪除,當然可以主動刪除。持久有序節點:節點創建一直存在,並且zk會自動為節點加上一個自增的後綴作為新的節點名稱。臨時有序節點:保留臨時節點的特性,並且zk會自動為節點加上一個自增的後綴作為新的節點名稱。

排它鎖的實現

排他鎖的實現相對簡單一點,利用了zk的創建節點不能重名的特性。如下圖:

拜託,別再問我Zookeeper如何實現分佈式鎖了! 11

根據上圖分析大致分為如下步驟:嘗試獲取鎖:創建臨時節點,zk會保證只有一個客戶端創建成功。創建臨時節點成功,獲取鎖成功,執行業務邏輯,業務執行完成後刪除鎖。創建臨時節點失敗,阻塞等待。監聽刪除事件,一旦臨時節點刪除了,表示互斥操作完成了,可以再次嘗試獲取鎖。遞歸:獲取鎖的過程是一個遞歸的操作,獲取鎖->監聽->獲取鎖。如何避免死鎖:創建的是臨時節點,當服務宕機會話關閉後臨時節點將會被刪除,鎖自動釋放。

代碼實現

作者參照JDK鎖的實現方式加上模板方法模式的封裝,封裝接口如下:

/**
* @Description ZK分布式锁的接口
* @Author 陈某
* @Date 2020/4/7 22:52
*/
public interface ZKLock {
/**
* 获取锁
*/
void lock() throws Exception;

/**
* 解锁
*/
void unlock() throws Exception;
}

模板抽像類如下:

/**
* @Description 排他锁,模板类
* @Author 陈某
* @Date 2020/4/7 22:55
*/
public abstract class AbstractZKLockMutex implements ZKLock {

/**
* 节点路径
*/
protected String lockPath;

/**
* zk客户端
*/
protected CuratorFramework zkClient;

private AbstractZKLockMutex(){}

public AbstractZKLockMutex(String lockPath,CuratorFramework client){
this.lockPath=lockPath;
this.zkClient=client;
}

/**
* 模板方法,搭建的获取锁的框架,具体逻辑交于子类实现
* @throws Exception
*/
@Override
public final void lock() throws Exception {
//获取锁成功
if (tryLock()){
System.out.println(Thread.currentThread().getName()+"获取锁成功");
}else{ //获取锁失败
//阻塞一直等待
waitLock();
//递归,再次获取锁
lock();
}
}

/**
* 尝试获取锁,子类实现
*/
protected abstract boolean tryLock() ;

/**
* 等待获取锁,子类实现
*/
protected abstract void waitLock() throws Exception;

/**
* 解锁:删除节点或者直接断开连接
*/
@Override
public abstract void unlock() throws Exception;
}

排他鎖的具體實現類如下:

/**
* @Description 排他锁的实现类,继承模板类 AbstractZKLockMutex
* @Author 陈某
* @Date 2020/4/7 23:23
*/
@Data
public class ZKLockMutex extends AbstractZKLockMutex {

/**
* 用于实现线程阻塞
*/
private CountDownLatch countDownLatch;

public ZKLockMutex(String lockPath,CuratorFramework zkClient){
super(lockPath,zkClient);
}

/**
* 尝试获取锁:直接创建一个临时节点,如果这个节点存在创建失败抛出异常,表示已经互斥了,
* 反之创建成功
* @throws Exception
*/
@Override
protected boolean tryLock() {
try {
zkClient.create()
//临时节点
.withMode(CreateMode.EPHEMERAL)
//权限列表 world:anyone:crdwa
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(lockPath,"lock".getBytes());
return true;
}catch (Exception ex){
return false;
}
}

/**
* 等待锁,一直阻塞监听
* @return 成功获取锁返回true,反之返回false
*/
@Override
protected void waitLock() throws Exception {
//监听节点的新增、更新、删除
final NodeCache nodeCache = new NodeCache(zkClient, lockPath);
//启动监听
nodeCache.start();
ListenerContainer listenable = nodeCache.getListenable();

//监听器
NodeCacheListener listener=()-> {
//节点被删除,此时获取锁
if (nodeCache.getCurrentData() == null) {
//countDownLatch不为null,表示节点存在,此时监听到节点删除了,因此-1
if (countDownLatch != null)
countDownLatch.countDown();
}
};
//添加监听器
listenable.addListener(listener);

//判断节点是否存在
Stat stat = zkClient.checkExists().forPath(lockPath);
//节点存在
if (stat!=null){
countDownLatch=new CountDownLatch(1);
//阻塞主线程,监听
countDownLatch.await();
}
//移除监听器
listenable.removeListener(listener);
}

/**
* 解锁,直接删除节点
* @throws Exception
*/
@Override
public void unlock() throws Exception {
zkClient.delete().forPath(lockPath);
}
}

可重入性排他鎖如何設計

可重入的邏輯很簡單,在本地保存一個ConcurrentMap,key是當前線程,value是定義的數據,結構如下:

private final ConcurrentMap threadData = Maps.newConcurrentMap();

重入的偽代碼如下:

public boolean tryLock(){
//判断当前线程是否在threadData保存过
//存在,直接return true
//不存在执行获取锁的逻辑
//获取成功保存在threadData中
}

讀寫鎖的實現

讀寫鎖分為讀鎖和寫鎖,區別如下:讀鎖允許多個線程同時讀數據,但是在讀的同時不允許寫線程修改。寫鎖在獲取後,不允許多個線程同時寫或者讀。如何實現讀寫鎖? ZK中有一類節點叫臨時有序節點,上文有介紹。下面我們來利用臨時有序節點來實現讀寫鎖的功能。

讀鎖的設計

讀鎖允許多個線程同時進行讀,並且在讀的同時不允許線程進行寫操作,實現原理如下圖:

拜託,別再問我Zookeeper如何實現分佈式鎖了! 12

根據上圖,獲取一個讀鎖分為以下步驟:創建臨時有序節點(當前線程擁有的讀鎖或稱作讀節點)。獲取路徑下所有的子節點,並進行從小到大排序獲取當前節點前的臨近寫節點(寫鎖)。如果不存在的臨近寫節點,則成功獲取讀鎖。如果存在臨近寫節點,對其監聽刪除事件。一旦監聽到刪除事件,重複2,3,4,5的步驟(遞歸)。

寫鎖的設計

線程一旦獲取了寫鎖,不允許其他線程讀和寫。實現原理如下:

拜託,別再問我Zookeeper如何實現分佈式鎖了! 13

從上圖可以看出唯一和寫鎖不同的就是監聽的節點,這裡是監聽臨近節點(讀節點或者寫節點),讀鎖只需要監聽寫節點,步驟如下:創建臨時有序節點(當前線程擁有的寫鎖或稱作寫節點)。獲取路徑下的所有子節點,並進行從小到大排序。獲取當前節點的臨近節點(讀節點和寫節點)。如果不存在臨近節點,則成功獲取鎖。如果存在臨近節點,對其進行監聽刪除事件。一旦監聽到刪除事件,重複2,3,4,5的步驟(遞歸)。

如何監聽

無論是寫鎖還是讀鎖都需要監聽前面的節點,不同的是讀鎖只監聽臨近的寫節點,寫鎖是監聽臨近的所有節點,抽像出來看其實是一種鍊式的監聽,如下圖:

拜託,別再問我Zookeeper如何實現分佈式鎖了! 14

每一個節點都在監聽前面的臨近節點,一旦前面一個節點刪除了,再從新排序後監聽前面的節點,這樣遞歸下去。

代碼實現

作者簡單的寫了讀寫鎖的實現,先造出來再優化,不建議用在生產環境。代碼如下:

public class ZKLockRW {

/**
* 节点路径
*/
protected String lockPath;

/**
* zk客户端
*/
protected CuratorFramework zkClient;

/**
* 用于阻塞线程
*/
private CountDownLatch countDownLatch=new CountDownLatch(1);

private final static String WRITE_NAME="_W_LOCK";

private final static String READ_NAME="_R_LOCK";

public ZKLockRW(String lockPath, CuratorFramework client) {
this.lockPath=lockPath;
this.zkClient=client;
}

/**
* 获取锁,如果获取失败一直阻塞
* @throws Exception
*/
public void lock() throws Exception {
//创建节点
String node = createNode();
//阻塞等待获取锁
tryLock(node);
countDownLatch.await();
}

/**
* 创建临时有序节点
* @return
* @throws Exception
*/
private String createNode() throws Exception {
//创建临时有序节点
return zkClient.create()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(lockPath);
}

/**
* 获取写锁
* @return
*/
public ZKLockRW writeLock(){
return new ZKLockRW(lockPath+WRITE_NAME,zkClient);
}

/**
* 获取读锁
* @return
*/
public ZKLockRW readLock(){
return new ZKLockRW(lockPath+READ_NAME,zkClient);
}

private void tryLock(String nodePath) throws Exception {
//获取所有的子节点
List childPaths = zkClient.getChildren()
.forPath("/")
.stream().sorted().map(o->"/"+o).collect(Collectors.toList());

//第一个节点就是当前的锁,直接获取锁。递归结束的条件
if (nodePath.equals(childPaths.get(0))){
countDownLatch.countDown();
return;
}

//1. 读锁:监听最前面的写锁,写锁释放了,自然能够读了
if (nodePath.contains(READ_NAME)){
//查找临近的写锁
String preNode = getNearWriteNode(childPaths, childPaths.indexOf(nodePath));
if (preNode==null){
countDownLatch.countDown();
return;
}
NodeCache nodeCache=new NodeCache(zkClient,preNode);
nodeCache.start();
ListenerContainer listenable = nodeCache.getListenable();
listenable.addListener(() -> {
//节点删除事件
if (nodeCache.getCurrentData()==null){
//继续监听前一个节点
String nearWriteNode = getNearWriteNode(childPaths, childPaths.indexOf(preNode));
if (nearWriteNode==null){
countDownLatch.countDown();
return;
}
tryLock(nearWriteNode);
}
});
}

//如果是写锁,前面无论是什么锁都不能读,直接循环监听上一个节点即可,直到前面无锁
if (nodePath.contains(WRITE_NAME)){
String preNode = childPaths.get(childPaths.indexOf(nodePath) - 1);
NodeCache nodeCache=new NodeCache(zkClient,preNode);
nodeCache.start();
ListenerContainer listenable = nodeCache.getListenable();
listenable.addListener(() -> {
//节点删除事件
if (nodeCache.getCurrentData()==null){
//继续监听前一个节点
tryLock(childPaths.get(childPaths.indexOf(preNode) - 1<0?0:childPaths.indexOf(preNode) - 1)); } }); } } /** * 查找临近的写节点 * @param childPath 全部的子节点 * @param index 右边界 * @return */ private String getNearWriteNode(List childPath,Integer index){ for (int i = 0; i < index; i++) { String node = childPath.get(i); if (node.contains(WRITE_NAME)) return node; } return null; } }

Curator實現分步式鎖

Curator是Netflix公司開源的一個Zookeeper客戶端,與Zookeeper提供的原生客戶端相比,Curator的抽象層次更高,簡化了Zookeeper客戶端的開發量。 Curator在分佈式鎖方面已經為我們封裝好了,大致實現的思路就是按照作者上述的思路實現的。中小型互聯網公司還是建議直接使用框架封裝好的,畢竟穩定,有些大型的互聯公司都是手寫的,牛逼啊。創建一個排他鎖很簡單,如下:

//arg1:CuratorFramework连接对象,arg2:节点路径
lock=new InterProcessMutex(client,path);
//获取锁
lock.acquire();
//释放锁
lock.release();

更多的API請參照官方文檔,不是此篇文章重點。至此ZK實現分佈式鎖就介紹完了,如有想要源碼的朋友,老規矩,回復關鍵詞分佈式鎖獲取。

一點小福利

對於Zookeeper不太熟悉的朋友,陳某特地花費兩天時間總結了ZK的常用知識點,包括ZK常用shell命令、ZK權限控制、Curator的基本操作API。目錄如下:

拜託,別再問我Zookeeper如何實現分佈式鎖了! 15

需要上面PDF文件的朋友,老規矩,回復關鍵詞ZK總結。