org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话
它提供以下几类主要方法 :
功能
描述
create
在本地目录树中创建一个节点
delete
删除一个节点
exists
测试本地是否存在目标节点
get/set data
从目标节点上读取 / 写数据
get/set ACL
获取 / 设置目标节点访问控制列表信息
get children
检索一个子节点上的列表
sync
等待要被传送的数据
表 1 : ZooKeeper API 描述
package cn.com.toto.zk;
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
public class SimpleDemo {
//回话超时时间,设置为与系统默认时间一致
private static final int SESSION_TIMEOUT = 30000;
//创建ZooKeeper实例
ZooKeeper zk;
//创建Watcher实例
Watcher wh = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.toString());
}
};
//初始化ZooKeeper实例
private void createZKInstance() throws IOException {
zk = new ZooKeeper("hadoop:2181,hadoop2:2181,hadoop3:2181",SimpleDemo.SESSION_TIMEOUT,this.wh);
}
private void ZKOperations() throws KeeperException, InterruptedException {
System.out.println("/n1. 创建 ZooKeeper 节点 (znode : zoo2, 数据: myData2 ,权限: OPEN_ACL_UNSAFE ,节点类型: Persistent");
zk.create("/zoo2", "myData2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("/n2.查看是否创建成功:");
System.out.println(new String(zk.getData("/zoo2", false, null)));
System.out.println("/n3.修改节点数据");
zk.setData("/zoo2", "toto".getBytes(), -1);
System.out.println("/n4.查看是否修改成功:");
System.out.println(new String(zk.getData("/zoo2", false, null)));
System.out.println("/n5.删除节点");
zk.delete("/zoo2", -1);
System.out.println("/n6.查看节点是否被删除:");
System.out.println("节点状态:[" + zk.exists("/zoo2", false) + "]");
}
private void ZKClose() throws InterruptedException {
zk.close();
}
public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
SimpleDemo dm = new SimpleDemo();
dm.createZKInstance();
dm.ZKOperations();
dm.ZKClose();
}
}
运行结果:
/n1. 创建 ZooKeeper 节点 (znode : zoo2, 数据: myData2 ,权限: OPEN_ACL_UNSAFE ,节点类型: Persistent
一月 10, 2017 12:48:26 上午 org.apache.zookeeper.ClientCnxn$SendThread primeConnection
信息: Socket connection established to hadoop3/192.168.106.82:2181, initiating session
一月 10, 2017 12:48:26 上午 org.apache.zookeeper.ClientCnxn$SendThread onConnected
信息: Session establishment complete on server hadoop3/192.168.106.82:2181, sessionid = 0x35983c177d00007, negotiated timeout = 30000
WatchedEvent state:SyncConnected type:None path:null
/n2.查看是否创建成功:
myData2
/n3.修改节点数据
/n4.查看是否修改成功:
toto
/n5.删除节点
/n6.查看节点是否被删除:
节点状态:[null]
Zookeeper的监听器工作机制
监听器是一个接口,我们的代码中可以实现Wather这个接口,实现其中的process方法,方法中即我们自己的业务逻辑
监听器的注册是在获取数据的操作中实现:
getData(path,watch?)监听的事件是:节点数据变化事件
getChildren(path,watch?)监听的事件是:节点下的子节点增减变化事件
所需jar包:
图1 项目包结构
package cn.com.toto.zk;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
public class SimpleZkClient {
private static final String connectString = "192.168.106.80:2181,192.168.106.81:2181,192.168.106.82:2181";
private static final int sessionTimeout = 2000;
// latch就相当于一个对象锁,当latch.await()方法执行时,方法所在的线程会等待
//当latch的count减为0时,将会唤醒等待的线程
CountDownLatch latch = new CountDownLatch(1);
ZooKeeper zkClient = null;
@Before
public void init() throws Exception {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
//事件监听回调方法
@Override
public void process(WatchedEvent event) {
if (latch.getCount() > 0 && event.getState() == KeeperState.SyncConnected) {
System.out.println("countdown");
latch.countDown();
}
//收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)
System.out.println(event.getType() + "---" + event.getPath());
System.out.println(event.getState());
}
});
latch.await();
}
//创建数据节点到zk中
@Test
public void testCreate() throws KeeperException, InterruptedException {
//参数1:要创建的节点的路径 参数2:节点大数据参数3:节点的权限 参数4:节点的类型
String nodeCreated = zkClient.create("/eclipse", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//上传的数据可以是任何类型,但都要转成byte
zkClient.close();
}
//判断znode是否存在
@Test
public void testExist() throws KeeperException, InterruptedException {
Stat stat = zkClient.exists("/eclipse", false);
System.out.println(stat == null ? "not exist" : "exist");
}
//获取znode下的孩子节点
@Test
public void getChildren() throws KeeperException, InterruptedException {
List<String> children = zkClient.getChildren("/", true);
for(String child : children) {
System.out.println(child);
}
Thread.sleep(Long.MAX_VALUE);
}
//获取参数
@Test
public void getData() throws KeeperException, InterruptedException {
byte[] data = zkClient.getData("/eclipse", true, null);
System.out.println(new String(data));
Thread.sleep(Long.MAX_VALUE);
}
//删除znode
@Test
public void deleteZnode() throws InterruptedException, KeeperException {
//参数2:指定要删除的版本,-1表示删除所有版本
zkClient.delete("/eclipse", -1);
}
//设置参数
@Test
public void setData() throws Exception {
//要注意,这里的/zookeeper 要在zookeeper中的节点中有
zkClient.setData("/zookeeper", "imissyou angelababy".getBytes(), -1);
byte[] data = zkClient.getData("/zookeeper", false, null);
System.out.println(new String(data));
}
}
案例二
package cn.com.toto.zk;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import com.sun.org.apache.bcel.internal.generic.NEW;
public class TestZKclient {
static ZooKeeper zk = null;
public static void main(String[] args) throws Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
zk = new ZooKeeper("hadoop:2181",2000,new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
countDownLatch.countDown();
}
System.out.println(event.getPath());
System.out.println(event.getType());
try {
zk.getChildren("/zookeeper", true);
} catch (Exception e) {
e.printStackTrace();
}
}
});
countDownLatch.await();
/**
zk.create("/myboys", "丑陋型".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.close();
**/
/**
byte[] data = zk.getData("/myboys", true, null);
System.out.println(new String(data,"UTF-8"));
Thread.sleep(Long.MAX_VALUE);
**/
/**
zk.create("/myboys/wangkai", "测试型".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.close();
**/
/**
List<String> children = zk.getChildren("/myboys", true);
for(String child : children) {
System.out.println(child);
}
**/
/**zk.delete("/myboys/wangkai", -1);**/
/**zk.setData("/myboys", "fasdfasdf".getBytes(), -1);**/
/**
byte[] data = zk.getData("/myboys", true, null);
System.out.println(new String(data,"UTF-8"));
**/
Stat stat = zk.exists("/mywives", true);
System.out.println(stat == null ? "确实不存在" : "存在");
zk.close();
}
}
拷贝ZooKeeper安装目录下的zookeeper.x.x.x.jar文件到项目的classpath路径下.
首先需要创建ZooKeeper对象, 后续的一切操作都是基于该对象进行的.
1. ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException
以下为各个参数的详细说明:
· connectString. zookeeper server列表, 以逗号隔开. ZooKeeper对象初始化后, 将从server列表中选择一个server, 并尝试与其建立连接. 如果连接建立失败, 则会从列表的剩余项中选择一个server, 并再次尝试建立连接.
· sessionTimeout. 指定连接的超时时间.
· watcher. 事件回调接口.
注意, 创建ZooKeeper对象时, 只要对象完成初始化便立刻返回. 建立连接是以异步的形式进行的, 当连接成功建立后, 会回调watcher的process方法. 如果想要同步建立与server的连接, 需要自己进一步封装.
1. public class ZKConnection {
2. /**
3. * server列表, 以逗号分割
4. */
5. protected String hosts = "localhost:4180,localhost:4181,localhost:4182";
6. /**
7. * 连接的超时时间, 毫秒
8. */
9. private static final int SESSION_TIMEOUT = 5000;
10. private CountDownLatch connectedSignal = new CountDownLatch(1);
11. protected ZooKeeper zk;
12.
13. /**
14. * 连接zookeeper server
15. */
16. public void connect() throws Exception {
17. zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new ConnWatcher());
18. // 等待连接完成
19. connectedSignal.await();
20. }
21.
22. public class ConnWatcher implements Watcher {
23. public void process(WatchedEvent event) {
24. // 连接建立, 回调process接口时, 其event.getState()为KeeperState.SyncConnected
25. if (event.getState() == KeeperState.SyncConnected) {
26. // 放开闸门, wait在connect方法上的线程将被唤醒
27. connectedSignal.countDown();
28. }
29. }
30. }
31. }
ZooKeeper对象的create方法用于创建znode.
1. String create(String path, byte[] data, List acl, CreateMode createMode);
以下为各个参数的详细说明:
· path. znode的路径.
· data. 与znode关联的数据.
· acl. 指定权限信息, 如果不想指定权限, 可以传入Ids.OPEN_ACL_UNSAFE.
· 指定znode类型. CreateMode是一个枚举类, 从中选择一个成员传入即可. 关于znode类型的详细说明, 可参考本人的上一篇博文.
1. /**
2. * 创建临时节点
3. */
4. public void create(String nodePath, byte[] data) throws Exception {
5. zk.create(nodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
6. }
ZooKeeper对象的getChildren方法用于获取子node列表.
1. List getChildren(String path, boolean watch);
watch参数用于指定是否监听path node的子node的增加和删除事件, 以及path node本身的删除事件.
ZooKeeper对象的exists方法用于判断指定znode是否存在.
1. Stat exists(String path, boolean watch);
watch参数用于指定是否监听path node的创建, 删除事件, 以及数据更新事件. 如果该node存在, 则返回该node的状态信息, 否则返回null.
ZooKeeper对象的getData方法用于获取node关联的数据.
1. byte[] getData(String path, boolean watch, Stat stat);
watch参数用于指定是否监听path node的删除事件, 以及数据更新事件, 注意, 不监听path node的创建事件, 因为如果path node不存在, 该方法将抛出KeeperException.NoNodeException异常. stat参数是个传出参数, getData方法会将path node的状态信息设置到该参数中.
ZooKeeper对象的setData方法用于更新node关联的数据.
1. Stat setData(final String path, byte data[], int version);
data为待更新的数据. version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查.返回path node的状态信息.
ZooKeeper对象的delete方法用于删除znode.
1. void delete(final String path, int version);
version参数的作用同setData方法.
请查看ZooKeeper对象的API文档.
· znode中关联的数据不能超过1M. zookeeper的使命是分布式协作, 而不是数据存储.
· getChildren, getData, exists方法可指定是否监听相应的事件. 而create, delete, setData方法则会触发相应的事件的发生.
· 以上介绍的几个方法大多存在其异步的重载方法, 具体请查看API说明.
相关资源:敏捷开发V1.0.pptx