zk三之watcher事件通知

    xiaoxiao2022-07-03  122

    watcher事件通知

    package com.test.demo.zktest001; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.util.concurrent.CountDownLatch; /** * @创建人 yfc * @创建时间 2019\5\22 * @描述 */ public class Zk002 implements Watcher { //zk连接地址 private static final String CONNECTION="127.0.0.1:2181"; //zk会话超时时间 private static final int SESSION_OUTTIME=2000; //信号量、阻塞程序执行,用户等待zookeeper连接成功,发送成功信号 private static final CountDownLatch countDownLatch=new CountDownLatch(1); private ZooKeeper zk; //zk创建连接 public void createConnection(String connection,int sessionTimeOut){ try { zk=new ZooKeeper(connection,sessionTimeOut,this); }catch (Exception e){ e.printStackTrace(); } } //创建持久化节点 public boolean createNode(String path,String data){ try { //开启事件监听,判断节点是否存在,如果存在返回节点的状态信息,如果不存在,返回null exists(path,true); //判断节点是否存在 String result=zk.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("新增节点成功,result:"+result); }catch (Exception e){ e.printStackTrace(); } return true; } //修改节点信息 public boolean update(String path,String data){ try { exists(path,true); //后面的数值指代的是版本号,如果 version 为 -1 怎可以匹配任何版本 Stat result=zk.setData(path,data.getBytes(),-1); System.out.println("result:"+result); }catch (Exception e){ e.printStackTrace(); } return true; } //事件通知 @Override public void process(WatchedEvent event) { //1.获取节点状态 Event.KeeperState state=event.getState(); //2.获取几点路径 String path=event.getPath(); //3.获取事件类型 Event.EventType type=event.getType(); //4.判断连接状态 if(Event.KeeperState.SyncConnected==state){ //表示事件为连接状态,EventType.None表示建立连接成功 if(Event.EventType.None==type){ System.out.println("***********开启连接************"); //信号量减1,只有当信号量的值为0时,才能不产生,阻塞 countDownLatch.countDown(); }else if(Event.EventType.NodeCreated==type){ System.out.println("事件通知,新增node节点"); }else if(Event.EventType.NodeDataChanged==type){ System.out.println("当前节点被修改"); } } } //关闭zk连接 public void close(){ try { if (zk!=null){ zk.close(); } }catch (Exception e){ } } //是否开启事件监听 public Stat exists(String path,boolean isWatch){ try{ //为true就开启事件监听 zk.exists(path,isWatch); }catch (Exception e){ e.printStackTrace(); } return null; } public static void main(String[] args) { Zk002 zk002=new Zk002(); zk002.createConnection(CONNECTION,SESSION_OUTTIME); //zk002.createNode("/demo/test4","test4"); zk002.update("/demo/test4","test004"); zk002.close(); } }

     

    最新回复(0)