Zookeeper 客户端API调用示例(基本使用,增删改查znode数据,监听znode,其它案例,其它网络参考资料)...

    xiaoxiao2023-08-06  143

    9.1 基本使用

    org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话

    它提供以下几类主要方法  :

    功能

    描述

    create

    在本地目录树中创建一个节点

    delete

    删除一个节点

    exists

    测试本地是否存在目标节点

    get/set data

    从目标节点上读取 / 写数据

    get/set ACL

    获取 / 设置目标节点访问控制列表信息

    get children

    检索一个子节点上的列表

    sync

    等待要被传送的数据

     

     

     

     

     

     

     

     

     

    表 1 : ZooKeeper API 描述

    9.2 增删改查znode数据

    package cn.com.toto.zk;

     

    import java.io.IOException;

     

    import org.apache.zookeeper.CreateMode;

    import org.apache.zookeeper.KeeperException;

    import org.apache.zookeeper.WatchedEvent;

    import org.apache.zookeeper.Watcher;

    import org.apache.zookeeper.ZooDefs.Ids;

    import org.apache.zookeeper.ZooKeeper;

     

    public class SimpleDemo {

        //回话超时时间,设置为与系统默认时间一致

        private static final int SESSION_TIMEOUT = 30000;

        //创建ZooKeeper实例

        ZooKeeper zk;

        //创建Watcher实例

        Watcher wh = new Watcher() {

       

           @Override

           public void process(WatchedEvent event) {

               System.out.println(event.toString());

           }

        };

       

        //初始化ZooKeeper实例

        private void createZKInstance() throws IOException {

           zk = new ZooKeeper("hadoop:2181,hadoop2:2181,hadoop3:2181",SimpleDemo.SESSION_TIMEOUT,this.wh);

        }

       

        private void ZKOperations() throws KeeperException, InterruptedException {

           System.out.println("/n1. 创建 ZooKeeper 节点 (znode zoo2, 数据: myData2 ,权限: OPEN_ACL_UNSAFE ,节点类型: Persistent");

           zk.create("/zoo2", "myData2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

           System.out.println("/n2.查看是否创建成功:");

           System.out.println(new String(zk.getData("/zoo2", false, null)));

           System.out.println("/n3.修改节点数据");

           zk.setData("/zoo2", "toto".getBytes(), -1);

           System.out.println("/n4.查看是否修改成功:");

           System.out.println(new String(zk.getData("/zoo2", false, null)));

           System.out.println("/n5.删除节点");

           zk.delete("/zoo2", -1);

           System.out.println("/n6.查看节点是否被删除:");

           System.out.println("节点状态:[" + zk.exists("/zoo2", false) + "]");

        }

       

        private void ZKClose() throws InterruptedException {

           zk.close();

        }

       

        public static void main(String[] args) throws KeeperException, InterruptedException, IOException {

           SimpleDemo dm = new SimpleDemo();

           dm.createZKInstance();

           dm.ZKOperations();

           dm.ZKClose();

        }

    }

    运行结果:

    /n1. 创建 ZooKeeper 节点 (znode : zoo2, 数据: myData2 ,权限: OPEN_ACL_UNSAFE ,节点类型: Persistent

    一月 10, 2017 12:48:26 上午 org.apache.zookeeper.ClientCnxn$SendThread primeConnection

    信息: Socket connection established to hadoop3/192.168.106.82:2181, initiating session

    一月 10, 2017 12:48:26 上午 org.apache.zookeeper.ClientCnxn$SendThread onConnected

    信息: Session establishment complete on server hadoop3/192.168.106.82:2181, sessionid = 0x35983c177d00007, negotiated timeout = 30000

    WatchedEvent state:SyncConnected type:None path:null

    /n2.查看是否创建成功:

    myData2

    /n3.修改节点数据

    /n4.查看是否修改成功:

    toto

    /n5.删除节点

    /n6.查看节点是否被删除:

    节点状态:[null]

     

    9.3 监听znode

    Zookeeper的监听器工作机制

    监听器是一个接口,我们的代码中可以实现Wather这个接口,实现其中的process方法,方法中即我们自己的业务逻辑

     

     

     

    监听器的注册是在获取数据的操作中实现:

    getData(path,watch?)监听的事件是:节点数据变化事件

    getChildren(path,watch?)监听的事件是:节点下的子节点增减变化事件

     

    9.4其它案例

    所需jar包:

    图1 项目包结构

    package cn.com.toto.zk;

     

    import java.util.List;

    import java.util.concurrent.CountDownLatch;

     

    import org.apache.zookeeper.CreateMode;

    import org.apache.zookeeper.KeeperException;

    import org.apache.zookeeper.WatchedEvent;

    import org.apache.zookeeper.Watcher;

    import org.apache.zookeeper.Watcher.Event.KeeperState;

    import org.apache.zookeeper.ZooDefs.Ids;

    import org.apache.zookeeper.ZooKeeper;

    import org.apache.zookeeper.data.Stat;

    import org.junit.Before;

    import org.junit.Test;

     

    public class SimpleZkClient {

       

        private static final String connectString = "192.168.106.80:2181,192.168.106.81:2181,192.168.106.82:2181";

        private static final int sessionTimeout = 2000;

       

        // latch就相当于一个对象锁,当latch.await()方法执行时,方法所在的线程会等待

        //latchcount减为0时,将会唤醒等待的线程

        CountDownLatch latch = new CountDownLatch(1);

        ZooKeeper zkClient = null;

       

        @Before

        public void init() throws Exception {

           zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

              

               //事件监听回调方法

               @Override

               public void process(WatchedEvent event) {

                  if (latch.getCount() > 0 && event.getState() == KeeperState.SyncConnected) {

                      System.out.println("countdown");

                      latch.countDown();

                  }

                 

                  //收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)

                  System.out.println(event.getType() + "---" + event.getPath());

                  System.out.println(event.getState());

               }

           });

           latch.await();

        }

       

        //创建数据节点到zk

        @Test

        public void testCreate() throws KeeperException, InterruptedException {

           //参数1:要创建的节点的路径  参数2:节点大数据参数3:节点的权限  参数4:节点的类型

           String nodeCreated = zkClient.create("/eclipse", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

            //上传的数据可以是任何类型,但都要转成byte

           zkClient.close();

        }

       

        //判断znode是否存在

        @Test

        public void testExist() throws KeeperException, InterruptedException {

           Stat stat = zkClient.exists("/eclipse", false);

           System.out.println(stat == null ? "not exist" : "exist");

        }

       

        //获取znode下的孩子节点

        @Test

        public void getChildren() throws KeeperException, InterruptedException {

            List<String> children = zkClient.getChildren("/", true);

            for(String child : children) {

                System.out.println(child);

            }

            Thread.sleep(Long.MAX_VALUE);

        }

     

        //获取参数

        @Test

        public void getData() throws KeeperException, InterruptedException {

           byte[] data = zkClient.getData("/eclipse", true, null);

           System.out.println(new String(data));

           Thread.sleep(Long.MAX_VALUE);

        }

       

        //删除znode

        @Test

        public void deleteZnode() throws InterruptedException, KeeperException {

            //参数2:指定要删除的版本,-1表示删除所有版本

           zkClient.delete("/eclipse", -1);

        }

       

        //设置参数

        @Test

        public void setData() throws Exception {

           //要注意,这里的/zookeeper 要在zookeeper中的节点中有

           zkClient.setData("/zookeeper", "imissyou angelababy".getBytes(), -1);

     

           byte[] data = zkClient.getData("/zookeeper", false, null);

           System.out.println(new String(data));

        }

    }

    案例二

    package cn.com.toto.zk;

     

    import java.util.List;

    import java.util.concurrent.CountDownLatch;

     

    import org.apache.zookeeper.CreateMode;

    import org.apache.zookeeper.WatchedEvent;

    import org.apache.zookeeper.Watcher;

    import org.apache.zookeeper.Watcher.Event.KeeperState;

    import org.apache.zookeeper.ZooDefs.Ids;

    import org.apache.zookeeper.ZooKeeper;

    import org.apache.zookeeper.data.Stat;

     

    import com.sun.org.apache.bcel.internal.generic.NEW;

     

    public class TestZKclient {

        static ZooKeeper zk = null;

       

        public static void main(String[] args) throws Exception {

            final CountDownLatch countDownLatch = new CountDownLatch(1);

            zk = new ZooKeeper("hadoop:2181",2000,new Watcher() {

              

               @Override

               public void process(WatchedEvent event) {

                  if (event.getState() == KeeperState.SyncConnected) {

                      countDownLatch.countDown();

                  }

                  System.out.println(event.getPath());

                  System.out.println(event.getType());

                  try {

                      zk.getChildren("/zookeeper", true);

                  } catch (Exception e) {

                      e.printStackTrace();

                  }

               }

           });

           

            countDownLatch.await();

           

            /**

            zk.create("/myboys", "丑陋型".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

            zk.close();

            **/

           

            /**

            byte[] data = zk.getData("/myboys", true, null);

            System.out.println(new String(data,"UTF-8"));

            Thread.sleep(Long.MAX_VALUE);

            **/

           

            /**

            zk.create("/myboys/wangkai", "测试型".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

            zk.close();

            **/

           

            /**

            List<String> children = zk.getChildren("/myboys", true);

            for(String child : children) {

               System.out.println(child);

            }

            **/

           

            /**zk.delete("/myboys/wangkai", -1);**/

           

            /**zk.setData("/myboys", "fasdfasdf".getBytes(), -1);**/

            /**

            byte[] data = zk.getData("/myboys", true, null);

            System.out.println(new String(data,"UTF-8"));

            **/

           

            Stat stat = zk.exists("/mywives", true);

            System.out.println(stat == null ? "确实不存在" : "存在");

            zk.close();

        }

    }

     

    9.5 其它网络参考资料

    准备工作

    拷贝ZooKeeper安装目录下的zookeeper.x.x.x.jar文件到项目的classpath路径下.

    创建连接和回调接口

    首先需要创建ZooKeeper对象, 后续的一切操作都是基于该对象进行的.

    1.  ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException  

    以下为各个参数的详细说明:

    ·         connectString. zookeeper server列表, 以逗号隔开. ZooKeeper对象初始化后, 将从server列表中选择一个server, 并尝试与其建立连接. 如果连接建立失败, 则会从列表的剩余项中选择一个server, 并再次尝试建立连接.

    ·         sessionTimeout. 指定连接的超时时间.

    ·         watcher. 事件回调接口.

    注意, 创建ZooKeeper对象时, 只要对象完成初始化便立刻返回. 建立连接是以异步的形式进行的, 当连接成功建立后, 会回调watcherprocess方法. 如果想要同步建立与server的连接, 需要自己进一步封装.

    1.  public class ZKConnection {  

    2.      /** 

    3.       * server列表以逗号分割 

    4.       */  

    5.      protected String hosts = "localhost:4180,localhost:4181,localhost:4182";  

    6.      /** 

    7.       * 连接的超时时间毫秒 

    8.       */  

    9.      private static final int SESSION_TIMEOUT = 5000;  

    10.     private CountDownLatch connectedSignal = new CountDownLatch(1);  

    11.     protected ZooKeeper zk;  

    12.   

    13.     /** 

    14.      * 连接zookeeper server 

    15.      */  

    16.     public void connect() throws Exception {  

    17.         zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new ConnWatcher());  

    18.         // 等待连接完成  

    19.         connectedSignal.await();  

    20.     }  

    21.   

    22.     public class ConnWatcher implements Watcher {  

    23.         public void process(WatchedEvent event) {  

    24.             // 连接建立回调process接口时event.getState()KeeperState.SyncConnected  

    25.             if (event.getState() == KeeperState.SyncConnected) {  

    26.                 // 放开闸门, waitconnect方法上的线程将被唤醒  

    27.                 connectedSignal.countDown();  

    28.             }  

    29.         }  

    30.     }  

    31. }  

     

    创建znode

    ZooKeeper对象的create方法用于创建znode.

    1.  String create(String path, byte[] data, List acl, CreateMode createMode);  

    以下为各个参数的详细说明:

    ·         path. znode的路径.

    ·         data. znode关联的数据.

    ·         acl. 指定权限信息, 如果不想指定权限, 可以传入Ids.OPEN_ACL_UNSAFE.

    ·         指定znode类型. CreateMode是一个枚举类, 从中选择一个成员传入即可. 关于znode类型的详细说明, 可参考本人的上一篇博文.

    1.  /** 

    2.   * 创建临时节点 

    3.   */  

    4.  public void create(String nodePath, byte[] data) throws Exception {  

    5.      zk.create(nodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);  

    6.  }  

     

    获取子node列表

    ZooKeeper对象的getChildren方法用于获取子node列表.

    1.  List getChildren(String path, boolean watch);  

    watch参数用于指定是否监听path node的子node的增加和删除事件, 以及path node本身的删除事件.

    判断znode是否存在

    ZooKeeper对象的exists方法用于判断指定znode是否存在.

    1.  Stat exists(String path, boolean watch);  

    watch参数用于指定是否监听path node的创建, 删除事件, 以及数据更新事件. 如果该node存在, 则返回该node的状态信息, 否则返回null.

    获取node中关联的数据

    ZooKeeper对象的getData方法用于获取node关联的数据.

    1.  byte[] getData(String path, boolean watch, Stat stat);  

    watch参数用于指定是否监听path node的删除事件, 以及数据更新事件, 注意, 不监听path node的创建事件, 因为如果path node不存在, 该方法将抛出KeeperException.NoNodeException异常. stat参数是个传出参数, getData方法会将path node的状态信息设置到该参数中.

    更新node中关联的数据

    ZooKeeper对象的setData方法用于更新node关联的数据.

    1.  Stat setData(final String path, byte data[], int version);  

    data为待更新的数据. version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version-1则忽略版本检查.返回path node的状态信息.

    删除znode

    ZooKeeper对象的delete方法用于删除znode.

    1.  void delete(final String path, int version);  

    version参数的作用同setData方法.

    其余接口

    请查看ZooKeeper对象的API文档.

    需要注意的几个地方

    ·         znode中关联的数据不能超过1M. zookeeper的使命是分布式协作, 而不是数据存储.

    ·         getChildren, getData, exists方法可指定是否监听相应的事件. create, delete, setData方法则会触发相应的事件的发生.

    ·         以上介绍的几个方法大多存在其异步的重载方法, 具体请查看API说明.

     

    相关资源:敏捷开发V1.0.pptx
    最新回复(0)