Curator介绍
Curator是Netflix开源的一套ZooKeeper客户端框架。Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层,应用方在使用的时候需要自己处理很多事情,于是在它的基础上封装了一下,提供了一套更好用的客户端框架。
Curator列举的ZooKeeper使用过程中的几个问题:
- 初始化连接的问题:
在client与server之间握手建立连接的过程中,如果握手失败,执行所有的同步方法(比如create,getData等)将抛出异常。 - 自动恢复(failover)的问题:
当client与一台server的连接丢失,并试图去连接另外一台server时,client将回到初始连接模式。 - session过期的问题:
在极端情况下,出现ZooKeeper session过期,客户端需要自己去监听该状态并重新创建ZooKeeper实例。 - 对可恢复异常的处理:
当在server端创建一个有序ZNode,而在将节点名返回给客户端时崩溃,此时client端抛出可恢复的异常,用户需要自己捕获这些异常并进行重试。 - 使用场景的问题:
Zookeeper提供了一些标准的使用场景支持,但是ZooKeeper对这些功能的使用说明文档很少,而且很容易用错。在一些极端场景下如何处理,zk并没有给出详细的文档说明。比如共享锁服务,当服务器端创建临时顺序节点成功,但是在客户端接收到节点名之前挂掉了,如果不能很好的处理这种情况,将导致死锁。
Curator主要从以下几个方面降低了zk使用的复杂性:
- 重试机制:提供可插拔的重试机制,它将给捕获所有可恢复的异常配置一个重试策略,并且内部也提供了几种标准的重试策略(比如指数补偿)。
- 连接状态监控:Curator初始化之后会一直的对zk连接进行监听,一旦发现连接状态发生变化,将作出相应的处理。
- zk客户端实例管理:Curator对zk客户端到server集群连接进行管理。并在需要的情况,重建zk客户端实例,保证与zk集群的可靠连接。
- 各种使用场景支持:Curator实现zk支持的大部分使用场景支持(甚至包括zk自身不支持的场景),这些实现都遵循了zk的最佳实践,并考虑了各种极端情况。
Curator通过以上的处理,让用户专注于自身的业务本身,而无需花费更多的精力在zk本身。
Curator几个组成部分
- Client: 是ZooKeeper客户端的一个替代品,提供了一些底层处理和相关的工具方法。
- Framework: 用来简化ZooKeeper高级功能的使用,并增加了一些新的功能,比如管理到ZooKeeper集群的连接,重试处理
- Recipes: 实现了通用ZooKeeper的recipe,该组件建立在Framework的基础之上
- Utilities:各种ZooKeeper的工具类
- Errors: 异常处理,连接,恢复等。
- Extensions: recipe扩展
创建Curator的Zookeeper客户端,并连接服务端
首先,对于ZooKeeper的连接就是创建一个CuratorFramework实例的过程,当ZooKeeper客户端内部出现异常,将自动进行重连或重试,一般会把CuratorFramework实例的创建交给工厂类CuratorFrameworkFactory。
CuratorFrameworkFactory类提供了两个方法,一个工厂方法newClient,一个构建方法build。使用工厂方法newClient可以创建一个默认的实例,而build构建方法可以对实例进行定制。当CuratorFramework实例构建完成,紧接着调用start()方法,在应用结束的时候,需要调用close()方法。
1.newClient方法
1 |
|
connectString参数是ZooKeeper服务的地址和端口号,对于集群情况下的多个ZooKeeper示例,之间使用逗号分隔。比如
String connectString = "127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181"
retryPolicy参数是指在连接ZK服务过程中重新连接测策略。在它的实现类
ExponentialBackoffRetry(int baseSleepTimeMs,int maxRetries)
中,baseSleepTimeMs
参数代表两次连接的等待时间,maxRetries
参数表示最大的尝试连接次数。- CuratorFramework示例创建完成,代表ZooKeeper已经连接成功,调用
start()
方法打开连接,在使用完毕后调用close()
方法关闭连接
newClient方法还存在一个重载方法,上面的代码中使用的是newClient(String connectString,RetryPolicy retryPolicy),除该方法外,它还可以指定会话(session)的过期时间以及连接的超时时间。1
2
3
4
5
6String connectString = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, 3000, 1000, retryPolicy);
client.start();
System.out.println("============>use ZooKeeper, complete your operation");
client.close();
2.builder方法
1 |
|
在ZooKeeper官网中,有这样一句话:
You only need one CuratorFramework object for each ZooKeeper cluster you are connecting
这句话告诉我们在一个应用中,在每个Zookeeper集群里,只需要一个CuratorFramework(客户端)实例就足够了。CuratorFramework实例都是线程安全的,你应该在你的应用中共享同一个CuratorFramework实例。根据ZooKeeper的这个特点,可以选择使用单例模式创建一个ZK客户端:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44package com.lzumetal.zookeeper.curator.client;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClientSingleton {
private volatile static CuratorFramework client;
private static final Logger log = LoggerFactory.getLogger(ClientSingleton.class);
private ClientSingleton(String ips, int maxElapsedTimeMs, int sleepMsBetweenRetries) {
log.info("开始连接到Zookeeper服务器:{}", ips);
/*
* maxElapsedTimeMs:最长重试时间
* sleepMsBetweenRetries:重试的间隔时间
*/
RetryUntilElapsed retryUntilElapsed = new RetryUntilElapsed(maxElapsedTimeMs, sleepMsBetweenRetries);
client = CuratorFrameworkFactory.builder()
.connectString(ips)
.connectionTimeoutMs(4000)
.sessionTimeoutMs(8000)
.retryPolicy(retryUntilElapsed)
.build();
client.start();
log.info("连接完成!!!");
}
public static CuratorFramework getClient(String ips, int maxElapsedTimeMs, int sleepMsBetweenRetries) {
if (client == null) {
synchronized (ClientSingleton.class) {
if (client == null) {
new ClientSingleton(ips, maxElapsedTimeMs, sleepMsBetweenRetries);
}
}
}
return client;
}
}
使用Curator客户端进行增删改查操作
1 | public class CuratorClientTest { |
Curator的监听器
Curator提供了三种Watcher(Cache)来监听结点的变化:
Path Cache:监视一个路径(path)下:
- 1)子结点的创建
- 2)删除
- 3)以及节点数据的更新。
产生的事件会传递给注册的
PathChildrenCacheListener
。Node Cache:监视一个结点数据(data)的创建、更新、删除,并将结点的数据缓存在本地。
Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
下面就测试一下最简单的Path Watcher:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49package com.lzumetal.zookeeper.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import java.util.concurrent.TimeUnit;
public class CuratorWatcherTest {
/**
* Zookeeper info
*/
private static final String ZK_ADDRESS = "localhost:2181";
private static final String ZK_PATH = "/zktest";
private static final String ZK_ADDRESS = "localhost:2181";
private static final String ZK_PATH = "/zktest";
private static final int MAX_ELAPSED_TIME_MS = 4000;
private static final int SLEEP_MS_BETWEEN_RETRIES = 2000;
public static void main(String[] args) throws Exception {
// 1.Connect to zk
CuratorFramework client = ClientSingleton.getClient(ZK_ADDRESS, MAX_ELAPSED_TIME_MS, SLEEP_MS_BETWEEN_RETRIES);
System.out.println("zk client start successfully!");
// 2.Register watcher
PathChildrenCache watcher = new PathChildrenCache(client, ZK_PATH, true); // true:if cache data
watcher.getListenable().addListener((client1, event) -> {
ChildData data = event.getData();
if (data == null) {
System.out.println("No data in event[" + event + "]");
} else {
System.out.println("Receive event: "
+ "type=[" + event.getType() + "]"
+ ", path=[" + data.getPath() + "]"
+ ", data=[" + new String(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
});
watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
System.out.println("Register zk watcher successfully!");
TimeUnit.MINUTES.sleep(5);
client.close();
}
}
Curator的选举
leader选举当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。
Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()
方法,说明它是当前的Leader。注意:当Listener从takeLeadership()
退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。autoRequeue()
方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。
1 | package com.lzumetal.zookeeper.curator; |
Curator的分布式锁
分布式的锁全局同步,这意味着任何一个时间点不会有两个客户端都拥有相同的锁。
1.可重入锁Shared Reentrant Lock
首先我们先看一个全局可重入的锁。 Shared意味着锁是全局可见的, 客户端都可以请求锁。 Reentrant和JDK的ReentrantLock类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。 它是由类InterProcessMutex来实现。 它的构造函数为:1
public InterProcessMutex(CuratorFramework client,String path);
通过acquire获得锁,并提供超时机制:
1 | public void acquire() |
Acquire the mutex - blocking until it’s available。Note: the same thread can call acquirere-entrantly。Each call to acquire must be balanced by a call to release()
1 | public boolean acquire(long time,TimeUnit unit) |
Acquire the mutex - blocks until it’s available or the given time expires。
Note: the same thread cancall acquire re-entrantly。Each call to acquire that returns true must be balanced by a call to release()
Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired,false if not
通过release()方法释放锁。 InterProcessMutex 实例可以重用。
Revoking ZooKeeper recipes wiki定义了可协商的撤销机制。 为了撤销mutex,调用下面的方法:
1 | public void makeRevocable(RevocationListener<T> listener) |
将锁设为可撤销的。当别的进程或线程想让你释放锁是Listener会被调用。
Parameters:
listener - the listener
如果你请求撤销当前的锁, 调用Revoker方法。
1 | public static void attemptRevoke(CuratorFramework client, |
to mark a lock for revocation。Assuming that the lock has been registeredwith a RevocationListener,it will get called and the lock should be released。Note,however,that revocation is cooperative。Parameters:client - the clientpath - the path of the lock - usually from something like InterProcessMutex。getParticipantNodes()
错误处理 还是强烈推荐你使用ConnectionStateListener处理连接状态的改变。 当连接LOST时你不再拥有锁。首先让我们创建一个模拟的共享资源, 这个资源期望只能单线程的访问,否则会有并发问题。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29package com.lzumetal.zookeeper.curator.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class FakeLimitedResource {
private final AtomicBoolean inUse = new AtomicBoolean(false);
public void use() throws InterruptedException {
/*
* 真实环境中我们会在这里访问/维护一个共享的资源
* 这个例子在使用锁的情况下不会非法并发异常IllegalStateException
* 但是在无锁的情况由于sleep了一段时间,很容易抛出异常
*/
if (!inUse.compareAndSet(false, true)) {
//抛出异常后该线程被打断终止
throw new IllegalStateException("Needs to be used by one client at a time");
}
try {
System.out.println("thread: ( " + Thread.currentThread().getName() + " ) is operating the resource");
TimeUnit.SECONDS.sleep((long) (3 * Math.random()));
System.out.println("thread: ( " + Thread.currentThread().getName() + " ) has compete operated resource");
} finally {
inUse.set(false);
}
}
}
然后创建一个ClientHandler类, 它负责请求锁, 使用资源,释放锁这样一个完整的访问过程。
1 | package com.lzumetal.zookeeper.curator; |
最后创建主程序来测试。
1 | package com.lzumetal.zookeeper.curator; |
代码也很简单,生成5个client, 每个client重复执行10次 请求锁–访问资源–释放锁的过程。每个client都在独立的线程中。 结果可以看到,锁是随机的被每个实例排他性的使用。既然是可重用的,你可以在一个线程中多次调用acquire,在线程拥有锁时它总是返回true。你不应该在多个线程中用同一个InterProcessMutex, 你可以在每个线程中都生成一个InterProcessMutex实例,它们的path都一样,这样它们可以共享同一个锁。2。不可重入锁Shared Lock这个锁和上面的相比,就是少了Reentrant的功能,也就意味着它不能在同一个线程中重入。 这个类是InterProcessSemaphoreMutex。 使用方法和上面的类类似。首先我们将上面的例子修改一下,测试一下它的重入。 修改ClientHandler。doWork,连续两次acquire:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public void doWork(long time, TimeUnit unit) throws Exception {
if (!lock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " could not acquire the lock");
}
System.out.println(clientName + " has the lock");
if (!lock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " could not acquire the lock");
}
System.out.println(clientName + " has the lock again");
try {
resource.use(); //access resource exclusively
} finally {
System.out.println(clientName + " releasing the lock");
lock.release(); // always release the lock in a finally block
lock.release(); // always release the lock in a finally block
}
}
注意我们也需要调用release两次。这和JDK的ReentrantLock用法一致。如果少调用一次release,则此线程依然拥有锁。 上面的代码没有问题,我们可以多次调用acquire,后续的acquire也不会阻塞。 将上面的InterProcessMutex换成不可重入锁InterProcessSemaphoreMutex,如果再运行上面的代码,结果就会发现线程被阻塞再第二个acquire上。 也就是此锁不是可重入的。
本节相关代码已上传至GitHub:https://github.com/liaosilzu2007/zookeeper-parent.git
本文参考:http://ifeve.com/zookeeper-lock/