使用Zookeeper进行开发

    xiaoxiao2026-02-21  22

    使用Zookeeper的API

    1 建立Zookeeper会话

    ZookeeperAPI围绕Zookeeper的会话而构建 每个API的调用都会传递该会话 如果会话被断开 会话会迁移到另一个台Zookeeper服务器上 只要会话还存活 该会话所代表的handle就继续有效 Zookeeper客户端会持续的保持该句柄的连接

    /** 主要参数 connectString 包含主机名和端口号 sessionTimeout 以毫秒为单位 表示Zookeeper等待客户端维持会话而发起通信的最长等待时间 如果超过该时间未得到客户端请求 服务端就会关闭会话 watcher 用于介绍会话事件的一个对象 该对象需要自己创建 客户端通过该watcher来监控与服务器的间会话的健康状态 也可以监控数据变化 **/ ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
    1.1 创建监视器实例
    /**Watcher接口**/ public interface Watcher { void process(WatchedEvent var1); } /** 连接到Zookeeper后 后台会有一个线程维护该会话 该线程为守护线程 也就是说线程即使处于活跃状态 程序也会退出 **/ class Master implements Watcher{ ZooKeeper zooKeeper; String hostPort; public Master(ZooKeeper zooKeeper, String hostPort) { this.zooKeeper = zooKeeper; this.hostPort = hostPort; } void startZk() throws IOException{ zooKeeper = new ZooKeeper(hostPort,15000,this); } @Override public void process(WatchedEvent watchedEvent) { /**只是简单打印事件**/ System.out.println(watchedEvent); } }
    2 获取管理权

    这里将通过Zookeeper实现一个简单的主节点选举算法 所有候选主节点进程尝试创建/master节点 但只有一个成功 该成功进程成为主节点

    /** 该实现尝试创建znode节点/master 如果znode节点存在 则失败 同时在/master节点上存储服务器的唯一ID 所创建的节点类型为临时节点 当创建它的会话关闭或无效时 Zookeeper会自动检测到 并删除节点 **/ zooKeeper.create("/master",Integer.toHexString(new Random().nextInt()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

    ️ create方法会抛出两种异常 KeeperException和InterruptedException 需求确保处理这两种异常 特别是KeeperException的子类ConnectionLossException 对于这两种异常 create方法可能已经成功 故需要主节点捕获并处理它们

    ConnectionLossException 该异常发生于客户端与服务端失去连接时 常常由于网络原因导致 该异常发生时 客户端无法获悉是在服务器处理前丢失了请求消息 还是在处理后客户端未收到响应消息 虽然客户端将会为后续请求重新建立连接 但进程必须知道一个未决请求是否被处理了 InterruptedException 该异常源于客户端线程的中断请求 也就是进程会中断本地客户端的请求处理过程 使请求处于未知状态

    这两种请求都将导致正常处理过程的中断 开发者无法假设处理过程中的请求状态

    /** path 想要获取数据的znode节点路径 watch 表示是否想要监听后续的数据变更 如果为true 可通过创建Zookeeper句柄时设置的Watcher对象得到事件 同时另一个版本提供Watcher对象为参数 通过该传入的对象接收变更事件 stat 最后一个参数类型stat结构 getData方法填充znode节点的元数据信息 **/ byte[] getData(String path, boolean watch, Stat stat) boolean checkMaster() throws Exception{ while (true){ try{ Stat stat = new Stat(); //获取/master节点数据检测活动主节点 byte data[] = zooKeeper.getData("/master",false,stat); //如果master存在 使用其中数据确定谁是群首 如果一个进程捕获到ConnectionLossException 该进程可能就是主节点 isLeader = new String(data).equals(serverId); }catch (KeeperException.NoNodeException e){ return false; }catch (KeeperException.ConnectionLossException e){ } } } void runForMaster() throws Exception{ while (true){ try{ //执行成功成为活动主节点 zooKeeper.create("/master",serverId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); isLeader=true; break; }catch (KeeperException.NodeExistsException e){ isLeader = false; break; }catch (KeeperException.ConnectionLossException e){ //如果发生该异常 并不会终止函数 这样就可以继续执行检测主节点是否存在 否则重试 } if(checkMaster()) break; } }
    3 异步调用

    Zookeeper中所有同步调用方法都有对应的异步调用方法 通过异步调用 可在单线程中同时进行多个调用 也可以简化开发

    //异步方法与同步方法非常类似 仅多了两个参数 //提供回调方法的对象 //上下文信息 create(String path, byte[] data, List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)

    方法调用后立即返回 回调对象通过传入的上下文参数获取数据

    //回调对象实现只有一个方法的StringCallback接口 //rc 返回调用结构 返回OK或与KeeperException异常对应的编码值 //path 传给create的Path参数 //ctx 上下文信息 //name 创建znode节点名称 AsyncCallback.StringCallback masterCreateCallback = new AsyncCallback.StringCallback() { @Override public void processResult(int rc, String s, Object o, String s1) { switch (KeeperException.Code.get(i)){ case CONNECTIONLOSS:checkMaster();return; case OK:isLeader=true;break; default:isLeader=false; } } }; void runForMaster() { zooKeeper.create("/master",serverId.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL,masterCreateCallback,null); }
    3.1 设置元数据
    AsyncCallback.StringCallback createCallback=new AsyncCallback.StringCallback() { @Override public void processResult(int i, String s, Object o, String s1) { switch (KeeperException.Code.get(i)){ //如果回调函数中得到连接丢失的返回码 通过调用createPath方法来对create操作进行重试 case CONNECTIONLOSS:createParent(s,(byte[])o); case OK:break; case NODEEXISTS:break; default: } } }; void createParent(String path,byte[] data){ zooKeeper.create(path,data,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT,createCallback,data); } public void bootStrap(){ createParent("/workers",new byte[0]); createParent("/tasks",new byte[0]); createParent("/status",new byte[0]); }
    最新回复(0)