Zookeeper分布式锁

原创转载请注明出处:<https://www.cnblogs.com/agilestyle/p/11605319.html&gt;

Zookeeper是一种提供“分布式服务协调“的中心化服务,分布式应用程序才可以基于Zookeeper的以下两个特性实现分布式锁功能。

  • 顺序临时节点:Zookeeper提供一个多层级的节点命名空间(节点称为Znode),每个节点都用一个以斜杠(/)分隔的路径来表示,而且每个节点都有父节点(根节点除外),非常类似于文件系统。节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),每个节点还能被标记为有序性(SEQUENTIAL),一旦节点被标记为有序性,那么整个节点就具有顺序自增的特点。一般可以组合这几类节点来创建所需要的节点,例如,创建一个持久节点作为父节点,在父节点下面创建临时节点,并标记该临时节点为有序性。
  • Watch机制:Zookeeper还提供了另外一个重要的特性,Watcher(事件监听器)。ZooKeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper服务端会将事件通知给用户。

熟悉了Zookeeper的这两个特性之后,就可以看看Zookeeper是如何实现分布式锁的了。

首先,需要建立一个父节点,节点类型为持久节点(PERSISTENT) ,每当需要访问共享资源时,就会在父节点下建立相应的顺序子节点,节点类型为临时节点(EPHEMERAL),且标记为有序性(SEQUENTIAL),并且以临时节点名称+父节点名称+顺序号组成特定的名字。

在建立子节点后,对父节点下面的所有以临时节点名称name开头的子节点进行排序,判断刚刚建立的子节点顺序号是否是最小的节点,如果是最小节点,则获得锁。

如果不是最小节点,则阻塞等待锁,并且获得该节点的上一顺序节点,为其注册监听事件,等待节点对应的操作获得锁。

当调用完共享资源后,删除该节点,关闭zk,进而可以触发监听事件,释放该锁。

![]()

以上实现的分布式锁是严格按照顺序访问的并发锁。一般还可以直接引用Curator框架来实现Zookeeper分布式锁,代码如下:

Maven Dependency

&lt;dependency&gt;
    &lt;groupId&gt;org.apache.curator&lt;/groupId&gt;
    &lt;artifactId&gt;curator-recipes&lt;/artifactId&gt;
    &lt;version&gt;4.2.0&lt;/version&gt;
&lt;/dependency&gt;

DistributedLock.java

 1 package org.fool.spring.util;
 2 
 3 import java.util.concurrent.TimeUnit;
 4 
 5 public interface DistributedLock {
 6 
 7     void lock() throws Exception;
 8 
 9     boolean tryLock(long time, TimeUnit unit) throws Exception;
10 
11     void unlock() throws Exception;
12 
13     boolean isAcquiredInThisProcess();
14 }

ZkDistributedLock.java

 1 package org.fool.spring.util;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
 5 
 6 import java.util.concurrent.TimeUnit;
 7 
 8 public class ZkDistributedLock implements DistributedLock {
 9 
10     private InterProcessMutex mutex;
11 
12     ZkDistributedLock(ZkClient zkClient, String lockPath) {
13         CuratorFramework client = zkClient.getClient();
14         this.mutex = new InterProcessMutex(client, lockPath);
15     }
16 
17     @Override
18     public void lock() throws Exception {
19         this.mutex.acquire();
20     }
21 
22     @Override
23     public boolean tryLock(long time, TimeUnit unit) throws Exception {
24         return this.mutex.acquire(time, unit);
25     }
26 
27     @Override
28     public void unlock() throws Exception {
29         this.mutex.release();
30     }
31 
32     @Override
33     public boolean isAcquiredInThisProcess() {
34         return this.mutex.isAcquiredInThisProcess();
35     }
36 
37 }

ZkClient.java

 1 package org.fool.spring.util;
 2 
 3 import org.apache.curator.framework.CuratorFramework;
 4 
 5 public class ZkClient {
 6 
 7     private final CuratorFramework client;
 8 
 9     ZkClient(CuratorFramework client) {
10         this.client = client;
11     }
12 
13     CuratorFramework getClient() {
14         return this.client;
15     }
16 
17     /**
18      * start the client
19      */
20     public void start() {
21         this.client.start();
22     }
23 
24     /**
25      * close the client
26      */
27     public void close() {
28         this.client.close();
29     }
30 
31 }

DistributedLocks.java

 1 package org.fool.spring.util;
 2 
 3 import org.apache.curator.RetryPolicy;
 4 import org.apache.curator.framework.CuratorFramework;
 5 import org.apache.curator.framework.CuratorFrameworkFactory;
 6 import org.apache.curator.retry.ExponentialBackoffRetry;
 7 
 8 public final class DistributedLocks {
 9 
10     private DistributedLocks() {
11     }
12 
13     private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 60 * 1000;
14     private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 15 * 1000;
15     private static final int BASE_SLEEP_TIME_MS = 1000;
16     private static final int MAX_RETRIES = 3;
17 
18     /**
19      * Define the default retry policy
20      */
21     private static final RetryPolicy DEFAULT_RETRY_POLICY = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES);
22 
23     /**
24      * Create a new ZkClient with custom connectString, default sessionTimeout and default connectionTimeout.
25      */
26     public static ZkClient newZkClient(String connectString) {
27         CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, DEFAULT_ZK_SESSION_TIMEOUT_MS,
28                 DEFAULT_ZK_CONNECTION_TIMEOUT_MS, DEFAULT_RETRY_POLICY);
29         return new ZkClient(client);
30     }
31 
32     /**
33      * Create a new ZkClient with custom connectString, sessionTimeout and connectionTimeout
34      */
35     public static ZkClient newZkClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs) {
36         CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs,
37                 connectionTimeoutMs, DEFAULT_RETRY_POLICY);
38         return new ZkClient(client);
39     }
40 
41     /**
42      * Create a new DistributedLock with ZkClient and lock path.
43      */
44     public static DistributedLock newZkDistributedLock(ZkClient zkClient, String lockPath) {
45         return new ZkDistributedLock(zkClient, lockPath);
46     }
47 }

TestZkDistributedLock.java

 1 package org.fool.spring.test;
 2 
 3 import org.fool.spring.util.DistributedLock;
 4 import org.fool.spring.util.DistributedLocks;
 5 import org.fool.spring.util.ZkClient;
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8 
 9 import java.util.concurrent.TimeUnit;
10 
11 public class TestZkDistributedLock {
12 
13     private static final Logger logger = LoggerFactory.getLogger(TestZkDistributedLock.class);
14 
15     private static final String lockPath = &quot;/curator/test&quot;;
16 
17     public static void main(String[] args) throws Exception {
18         ZkClient zkClient = DistributedLocks.newZkClient(&quot;127.0.0.1:2181&quot;);
19         zkClient.start();
20 
21         DistributedLock lock = DistributedLocks.newZkDistributedLock(zkClient, lockPath);
22 
23         boolean isAcquired = lock.isAcquiredInThisProcess();
24         logger.info(&quot;==========lock acquired: &quot; + isAcquired + &quot;==========&quot;);
25 
26         if (lock.tryLock(3, TimeUnit.SECONDS)) {
27             try {
28                 isAcquired = lock.isAcquiredInThisProcess();
29                 logger.info(&quot;==========lock acquired: &quot; + isAcquired + &quot;==========&quot;);
30 
31                 // mock to do business logic
32                 Thread.sleep(60000);
33             } finally {
34                 lock.unlock();
35                 logger.info(&quot;==========release the lock !!!==========&quot;);
36             }
37         } else {
38             logger.info(&quot;==========failed to get the lock !!!==========&quot;);
39         }
40 
41         zkClient.close();
42     }
43 }

Test

执行TestZkDistributedLock,模拟业务执行占用60s时间

![]()

在60s内,再次执行TestZkDistributedLock,可以看到尝试获取锁失败

![]()

打开zk client,查看执行期间内的顺序临时节点的变化情况

![]()

Summary

Zookeeper实现的分布式锁

优点

  • Zookeeper是集群实现,可以避免单点问题,且能保证每次操作都可以有效地释放锁,这是因为一旦应用服务挂掉了,临时节点会因为session连接断开而自动删除掉。

缺点

  • 由于频繁地创建和删除结点,加上大量的Watch事件,对Zookeeper集群来说,压力非常大。且从性能上来说,与Redis实现的分布式锁相比,还是存在一定的差距。

Reference

<https://time.geekbang.org/column/article/125983&gt;

<http://curator.apache.org/&gt;

声明:该文章系转载,转载该文章的目的在于更广泛的传递信息,并不代表本网站赞同其观点,文章内容仅供参考。

本站是一个个人学习和交流平台,网站上部分文章为网站管理员和网友从相关媒体转载而来,并不用于任何商业目的,内容为作者个人观点, 并不代表本网站赞同其观点和对其真实性负责。

我们已经尽可能的对作者和来源进行了通告,但是可能由于能力有限或疏忽,导致作者和来源有误,亦可能您并不期望您的作品在我们的网站上发布。我们为这些问题向您致歉,如果您在我站上发现此类问题,请及时联系我们,我们将根据您的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。