ZooKeeper(五)开源客户端框架Curator

Curator介绍

Curator是Netflix开源的一套ZooKeeper客户端框架。Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层,应用方在使用的时候需要自己处理很多事情,于是在它的基础上封装了一下,提供了一套更好用的客户端框架。

Curator列举的ZooKeeper使用过程中的几个问题:

  • 初始化连接的问题:
    在client与server之间握手建立连接的过程中,如果握手失败,执行所有的同步方法(比如create,getData等)将抛出异常。
  • 自动恢复(failover)的问题:
    当client与一台server的连接丢失,并试图去连接另外一台server时,client将回到初始连接模式。
  • session过期的问题:
    在极端情况下,出现ZooKeeper session过期,客户端需要自己去监听该状态并重新创建ZooKeeper实例。
  • 对可恢复异常的处理:
    当在server端创建一个有序ZNode,而在将节点名返回给客户端时崩溃,此时client端抛出可恢复的异常,用户需要自己捕获这些异常并进行重试。
  • 使用场景的问题:
    Zookeeper提供了一些标准的使用场景支持,但是ZooKeeper对这些功能的使用说明文档很少,而且很容易用错。在一些极端场景下如何处理,zk并没有给出详细的文档说明。比如共享锁服务,当服务器端创建临时顺序节点成功,但是在客户端接收到节点名之前挂掉了,如果不能很好的处理这种情况,将导致死锁。

Curator主要从以下几个方面降低了zk使用的复杂性:

  • 重试机制:提供可插拔的重试机制,它将给捕获所有可恢复的异常配置一个重试策略,并且内部也提供了几种标准的重试策略(比如指数补偿)。
  • 连接状态监控:Curator初始化之后会一直的对zk连接进行监听,一旦发现连接状态发生变化,将作出相应的处理。
  • zk客户端实例管理:Curator对zk客户端到server集群连接进行管理。并在需要的情况,重建zk客户端实例,保证与zk集群的可靠连接。
  • 各种使用场景支持:Curator实现zk支持的大部分使用场景支持(甚至包括zk自身不支持的场景),这些实现都遵循了zk的最佳实践,并考虑了各种极端情况。

Curator通过以上的处理,让用户专注于自身的业务本身,而无需花费更多的精力在zk本身。

Curator几个组成部分

  • Client: 是ZooKeeper客户端的一个替代品,提供了一些底层处理和相关的工具方法。
  • Framework: 用来简化ZooKeeper高级功能的使用,并增加了一些新的功能,比如管理到ZooKeeper集群的连接,重试处理
  • Recipes: 实现了通用ZooKeeper的recipe,该组件建立在Framework的基础之上
  • Utilities:各种ZooKeeper的工具类
  • Errors: 异常处理,连接,恢复等。
  • Extensions: recipe扩展

创建Curator的Zookeeper客户端,并连接服务端

首先,对于ZooKeeper的连接就是创建一个CuratorFramework实例的过程,当ZooKeeper客户端内部出现异常,将自动进行重连或重试,一般会把CuratorFramework实例的创建交给工厂类CuratorFrameworkFactory。
CuratorFrameworkFactory类提供了两个方法,一个工厂方法newClient,一个构建方法build。使用工厂方法newClient可以创建一个默认的实例,而build构建方法可以对实例进行定制。当CuratorFramework实例构建完成,紧接着调用start()方法,在应用结束的时候,需要调用close()方法。

1.newClient方法

1
2
3
4
5
6
7
8
9
@Test
public void createByNew() {
String connectString = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
client.start();
System.out.println("============>use ZooKeeper, complete your operation");
client.close();
}
  1. connectString参数是ZooKeeper服务的地址和端口号,对于集群情况下的多个ZooKeeper示例,之间使用逗号分隔。比如
    String connectString = "127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181"

  2. retryPolicy参数是指在连接ZK服务过程中重新连接测策略。在它的实现类ExponentialBackoffRetry(int baseSleepTimeMs,int maxRetries)中,baseSleepTimeMs参数代表两次连接的等待时间,maxRetries参数表示最大的尝试连接次数。

  3. CuratorFramework示例创建完成,代表ZooKeeper已经连接成功,调用start()方法打开连接,在使用完毕后调用close()方法关闭连接

newClient方法还存在一个重载方法,上面的代码中使用的是newClient(String connectString,RetryPolicy retryPolicy),除该方法外,它还可以指定会话(session)的过期时间以及连接的超时时间。

1
2
3
4
5
6
String connectString = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, 3000, 1000, retryPolicy);
client.start();
System.out.println("============>use ZooKeeper, complete your operation");
client.close();

2.builder方法

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void createByBuilder() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(retryPolicy)
.sessionTimeoutMs(1000 * 6)
.connectionTimeoutMs(1000 * 6)
.build();
client.start();
System.out.println("============>use ZooKeeper, complete your operation");
client.close();
}

在ZooKeeper官网中,有这样一句话:

You only need one CuratorFramework object for each ZooKeeper cluster you are connecting

这句话告诉我们在一个应用中,在每个Zookeeper集群里,只需要一个CuratorFramework(客户端)实例就足够了。CuratorFramework实例都是线程安全的,你应该在你的应用中共享同一个CuratorFramework实例。根据ZooKeeper的这个特点,可以选择使用单例模式创建一个ZK客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.lzumetal.zookeeper.curator.client;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ClientSingleton {

private volatile static CuratorFramework client;
private static final Logger log = LoggerFactory.getLogger(ClientSingleton.class);

private ClientSingleton(String ips, int maxElapsedTimeMs, int sleepMsBetweenRetries) {
log.info("开始连接到Zookeeper服务器:{}", ips);
/*
* maxElapsedTimeMs:最长重试时间
* sleepMsBetweenRetries:重试的间隔时间
*/
RetryUntilElapsed retryUntilElapsed = new RetryUntilElapsed(maxElapsedTimeMs, sleepMsBetweenRetries);
client = CuratorFrameworkFactory.builder()
.connectString(ips)
.connectionTimeoutMs(4000)
.sessionTimeoutMs(8000)
.retryPolicy(retryUntilElapsed)
.build();
client.start();
log.info("连接完成!!!");
}

public static CuratorFramework getClient(String ips, int maxElapsedTimeMs, int sleepMsBetweenRetries) {
if (client == null) {
synchronized (ClientSingleton.class) {
if (client == null) {
new ClientSingleton(ips, maxElapsedTimeMs, sleepMsBetweenRetries);
}
}
}

return client;
}

}

使用Curator客户端进行增删改查操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class CuratorClientTest {

private static final Logger log = LoggerFactory.getLogger(CuratorClientTest.class);

private static final String IPS = "localhost:2181";
public static final int MAXELAPSEDTIMEMS = 4000; //最长重试时间
public static final int SLEEPMSBETWEENRETRIES = 2000;//重试间隔时间
private static final String ZK_PATH = "/zktest";

public static void main(String[] args) throws Exception {

/**
* 方法打印结果:
*
* $ create /zktest hello
* $ ls /
* [zktest, zookeeper]
* $ get /zktest
* hello
* $ set /zktest world
* $ get /zktest
* world
* $ delete /zktest
* $ ls /
* [zookeeper]
*/

// 1.Connect to zk
CuratorFramework client = ClientSingleton.getClient(IPS, MAXELAPSEDTIMEMS, SLEEPMSBETWEENRETRIES);

// 2.Client API test
// 2.1 Create node
String data1 = "hello";
print("create", ZK_PATH, data1);
client.create()
.creatingParentsIfNeeded()
.forPath(ZK_PATH, data1.getBytes());

// 2.2 Get node and data
print("ls", "/");
print(client.getChildren().forPath("/"));
print("get", ZK_PATH);
print(client.getData().forPath(ZK_PATH));

// 2.3 Modify data
String data2 = "world";
print("set", ZK_PATH, data2);
client.setData().forPath(ZK_PATH, data2.getBytes());
print("get", ZK_PATH);
print(client.getData().forPath(ZK_PATH));

// 2.4 Remove node
print("delete", ZK_PATH);
client.delete().forPath(ZK_PATH);
print("ls", "/");
print(client.getChildren().forPath("/"));

client.close();
}

private static void print(String... cmds) {
StringBuilder text = new StringBuilder("$ ");
for (String cmd : cmds) {
text.append(cmd).append(" ");
}
log.warn(text.toString());
}

private static void print(Object result) {
log.warn(result instanceof byte[] ? new String((byte[]) result) : result.toString());
}
}

Curator的监听器

Curator提供了三种Watcher(Cache)来监听结点的变化:

  • Path Cache:监视一个路径(path)下:

    • 1)子结点的创建
    • 2)删除
    • 3)以及节点数据的更新。

    产生的事件会传递给注册的PathChildrenCacheListener

  • Node Cache:监视一个结点数据(data)的创建、更新、删除,并将结点的数据缓存在本地。

  • Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。

下面就测试一下最简单的Path Watcher:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.lzumetal.zookeeper.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;

import java.util.concurrent.TimeUnit;

public class CuratorWatcherTest {

/**
* Zookeeper info
*/
private static final String ZK_ADDRESS = "localhost:2181";
private static final String ZK_PATH = "/zktest";
private static final String ZK_ADDRESS = "localhost:2181";
private static final String ZK_PATH = "/zktest";
private static final int MAX_ELAPSED_TIME_MS = 4000;
private static final int SLEEP_MS_BETWEEN_RETRIES = 2000;

public static void main(String[] args) throws Exception {

// 1.Connect to zk
CuratorFramework client = ClientSingleton.getClient(ZK_ADDRESS, MAX_ELAPSED_TIME_MS, SLEEP_MS_BETWEEN_RETRIES);
System.out.println("zk client start successfully!");

// 2.Register watcher
PathChildrenCache watcher = new PathChildrenCache(client, ZK_PATH, true); // true:if cache data

watcher.getListenable().addListener((client1, event) -> {
ChildData data = event.getData();
if (data == null) {
System.out.println("No data in event[" + event + "]");
} else {
System.out.println("Receive event: "
+ "type=[" + event.getType() + "]"
+ ", path=[" + data.getPath() + "]"
+ ", data=[" + new String(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
});
watcher.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
System.out.println("Register zk watcher successfully!");

TimeUnit.MINUTES.sleep(5);
client.close();
}

}

Curator的选举

leader选举当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。
Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.lzumetal.zookeeper.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.EnsurePath;

/**
* Curator framework's leader election test.
* Output:
* LeaderSelector-2 take leadership!
* LeaderSelector-2 relinquish leadership!
* LeaderSelector-1 take leadership!
* LeaderSelector-1 relinquish leadership!
* LeaderSelector-0 take leadership!
* LeaderSelector-0 relinquish leadership!
* ...
*/
public class CuratorLeaderTest {

/** Zookeeper info */
private static final String ZK_ADDRESS = "192.168.1.100:2181";
private static final String ZK_PATH = "/zktest";

public static void main(String[] args) throws InterruptedException {

LeaderSelectorListener listener = new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println(Thread.currentThread().getName() + " take leadership!");

// takeLeadership() method should only return when leadership is being relinquished.
Thread.sleep(5000L);

System.out.println(Thread.currentThread().getName() + " relinquish leadership!");
}

@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
}
};

new Thread(() -> {
registerListener(listener);
}).start();

new Thread(() -> {
registerListener(listener);
}).start();

new Thread(() -> {
registerListener(listener);
}).start();

Thread.sleep(Integer.MAX_VALUE);
}

private static void registerListener(LeaderSelectorListener listener) {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,
new RetryNTimes(10, 5000));
client.start();

// 2.Ensure path
try {
new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());
} catch (Exception e) {
e.printStackTrace();
}

// 3.Register listener
LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);
selector.autoRequeue();
selector.start();
}


}

Curator的分布式锁

分布式的锁全局同步,这意味着任何一个时间点不会有两个客户端都拥有相同的锁。

1.可重入锁Shared Reentrant Lock

首先我们先看一个全局可重入的锁。 Shared意味着锁是全局可见的, 客户端都可以请求锁。 Reentrant和JDK的ReentrantLock类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。 它是由类InterProcessMutex来实现。 它的构造函数为:

1
public InterProcessMutex(CuratorFramework client,String path);

通过acquire获得锁,并提供超时机制:

1
public void acquire()

Acquire the mutex - blocking until it’s available。Note: the same thread can call acquirere-entrantly。Each call to acquire must be balanced by a call to release()

1
public boolean acquire(long time,TimeUnit unit)

Acquire the mutex - blocks until it’s available or the given time expires。
Note: the same thread cancall acquire re-entrantly。Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit

Returns:
true if the mutex was acquired,false if not

通过release()方法释放锁。 InterProcessMutex 实例可以重用。
Revoking ZooKeeper recipes wiki定义了可协商的撤销机制。 为了撤销mutex,调用下面的方法:

1
public void makeRevocable(RevocationListener<T> listener)

将锁设为可撤销的。当别的进程或线程想让你释放锁是Listener会被调用。

Parameters:
listener - the listener

如果你请求撤销当前的锁, 调用Revoker方法。

1
2
public static void attemptRevoke(CuratorFramework client,
String path) throws ExceptionUtility

to mark a lock for revocation。Assuming that the lock has been registeredwith a RevocationListener,it will get called and the lock should be released。Note,however,that revocation is cooperative。Parameters:client - the clientpath - the path of the lock - usually from something like InterProcessMutex。getParticipantNodes()

错误处理 还是强烈推荐你使用ConnectionStateListener处理连接状态的改变。 当连接LOST时你不再拥有锁。首先让我们创建一个模拟的共享资源, 这个资源期望只能单线程的访问,否则会有并发问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.lzumetal.zookeeper.curator.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;


public class FakeLimitedResource {

private final AtomicBoolean inUse = new AtomicBoolean(false);

public void use() throws InterruptedException {
/*
* 真实环境中我们会在这里访问/维护一个共享的资源
* 这个例子在使用锁的情况下不会非法并发异常IllegalStateException
* 但是在无锁的情况由于sleep了一段时间,很容易抛出异常
*/
if (!inUse.compareAndSet(false, true)) {
//抛出异常后该线程被打断终止
throw new IllegalStateException("Needs to be used by one client at a time");
}
try {
System.out.println("thread: ( " + Thread.currentThread().getName() + " ) is operating the resource");
TimeUnit.SECONDS.sleep((long) (3 * Math.random()));
System.out.println("thread: ( " + Thread.currentThread().getName() + " ) has compete operated resource");
} finally {
inUse.set(false);
}
}
}

然后创建一个ClientHandler类, 它负责请求锁, 使用资源,释放锁这样一个完整的访问过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.lzumetal.zookeeper.curator;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;

import java.util.concurrent.TimeUnit;


public class ClientHandler {

private final InterProcessMutex lock;
private final FakeLimitedResource resource;
private final String clientName;

public ClientHandler(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
this.resource = resource;
this.clientName = clientName;
lock = new InterProcessMutex(client, lockPath);
}

public void doWork(long time, TimeUnit unit) throws Exception {
if (!lock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " could not acquire the lock");
}
try {
System.out.println(clientName + " has the lock");
resource.use(); //access resource exclusively
} finally {
System.out.println(clientName + " releasing the lock");
lock.release(); // always release the lock in a finally block
}
}


public void doWorkWithoutLock() throws Exception {
try {
System.out.println(clientName + " start to do work");
resource.use();
} finally {
System.out.println(clientName + " complete work");
}
}
}

最后创建主程序来测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.lzumetal.zookeeper.curator;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


public class InterProcessMutexExample {


private static final int QTY = 5;
private static final int REPETITIONS = 10;
private static final String PATH = "/examples/locks";
private static final String ZK_ADDRESS = "localhost:2181";


public static void main(String[] args) throws Exception {
final FakeLimitedResource resource = new FakeLimitedResource();
ExecutorService threadPool = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
final int index = i;
Runnable task = () -> {
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, new ExponentialBackoffRetry(1000, 3));
try {
client.start();
final ClientHandler clientHandler = new ClientHandler(client, PATH, resource, "Client " + index);
for (int j = 0; j < REPETITIONS; ++j) {
//clientHandler.doWork(10, TimeUnit.SECONDS);
clientHandler.doWorkWithoutLock();
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
};
threadPool.submit(task);
}
threadPool.shutdown();
threadPool.awaitTermination(3, TimeUnit.MINUTES);
}
}

代码也很简单,生成5个client, 每个client重复执行10次 请求锁–访问资源–释放锁的过程。每个client都在独立的线程中。 结果可以看到,锁是随机的被每个实例排他性的使用。既然是可重用的,你可以在一个线程中多次调用acquire,在线程拥有锁时它总是返回true。你不应该在多个线程中用同一个InterProcessMutex, 你可以在每个线程中都生成一个InterProcessMutex实例,它们的path都一样,这样它们可以共享同一个锁。2。不可重入锁Shared Lock这个锁和上面的相比,就是少了Reentrant的功能,也就意味着它不能在同一个线程中重入。 这个类是InterProcessSemaphoreMutex。 使用方法和上面的类类似。首先我们将上面的例子修改一下,测试一下它的重入。 修改ClientHandler。doWork,连续两次acquire:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void doWork(long time, TimeUnit unit) throws Exception {
if (!lock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " could not acquire the lock");
}
System.out.println(clientName + " has the lock");
if (!lock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " could not acquire the lock");
}
System.out.println(clientName + " has the lock again");

try {
resource.use(); //access resource exclusively
} finally {
System.out.println(clientName + " releasing the lock");
lock.release(); // always release the lock in a finally block
lock.release(); // always release the lock in a finally block
}
}

注意我们也需要调用release两次。这和JDK的ReentrantLock用法一致。如果少调用一次release,则此线程依然拥有锁。 上面的代码没有问题,我们可以多次调用acquire,后续的acquire也不会阻塞。 将上面的InterProcessMutex换成不可重入锁InterProcessSemaphoreMutex,如果再运行上面的代码,结果就会发现线程被阻塞再第二个acquire上。 也就是此锁不是可重入的。


本节相关代码已上传至GitHub:https://github.com/liaosilzu2007/zookeeper-parent.git
本文参考:http://ifeve.com/zookeeper-lock/

------ 本文完 ------