【笔记】从 Paxos 到 Zookeeper:第七章 Zookeeper 技术内幕之客户端

    xiaoxiao2025-06-21  19

    文章目录

    系统模型数据模型节点特性版本-保证分布式数据原子性操作Watcher-数据变更通知ACL-保障数据安全 序列化与协议序列化通信协议 客户端一次会话的创建过程初始化阶段会话创建阶段响应处理阶段 服务器地址列表ClientCnxn:网络 I/O 会话

    系统模型

    这一节首先从数据模型、节点特性、版本、Watcher 和 ACL 五方面来讲述 Zookeeper 的系统模型。

    数据模型

    ZooKeeper 的视图结构和 Unix 文件系统非常类似,但没有引入传统文件系统中目录和文件的概念,而是使用了特有的“数据节点”概念,称为 ZNode。ZNode 是 Zookeeper 中数据的最小单元,每个 ZNode 上都可以保存数据,同时还可以挂载子节点,因此构成了一个层次化的命名空间,可以称之为“树”。

    广义上说,事务是应用程序中一系列操作的集合。狭义上的数据库中的事务一般包含了一系列对数据库有序的读写操作,数据库事务通常具备 ACID 特性。在 ZooKeeper 中事务是指能够改变 ZooKeeper 服务器状态的操作,一般包括数据节点创建、删除、更新和客户端会话创建于失效等操作。对于每一个事务请求,ZooKeeper 会为其分配一个全局唯一的事务 ID,称为 ZXID,通常是一个64位的数字。

    节点特性

    ZooKeeper 中每个节点都是有生命周期的,其生命周期的长短取决于数据节点的节点类型。在 ZooKeeper 中,节点类型可以分为:持久节点 PERSISTENT、临时节点 EPHEMERAL、顺序节点 SEQUENTIAL 三大类,经过组合使用可以产生四种组合型节点类型:

    持久节点 PERSISTENT:一旦被创建,会一直存在于 ZooKeeper 中。持久顺序节点 PERSISTENT_SEQUENTIAL:这类节点既有持久性,也有顺序性。顺序性是指 ZooKeeper 会自动为每个给定节点名加上一个数字后缀,作为一个新的完整的节点名。临时节点 EPHEMERAL:其生命周期和客户端的会话绑定在一起,如果客户端会话失效,该节点就会被自动删除。这里指的客户端会话失效,并非简单的 TCP 连接失效,具体将在后续章节介绍。临时顺序节点 EPHEMERAL_SEQUENTIAL:既有临时节点的特征,也有顺序节点的特征。

    ZooKeeper 上每个数据节点除了存储数据内容之外,还存储了数据节点本身的一些状态信息,这些状态信息对应原生 API 里的 Stat 类:

    public class Stat implements Record { private long czxid; // 节点被创建时的事务 ID private long mzxid; // 节点最近一次更新的事务 ID private long ctime; // 创建时间 private long mtime; // 最近更新时间 private int version; // 版本号 private int cversion; // 子节点的版本号 private int aversion; // 节点的 ACL 版本号 private long ephemeralOwner; // 创建该临时节点的会话的 sessionID,如果是持久节点,其值为 0 private int dataLength; // 数据内容长度 private int numChildren;// 子节点个数 private long pzxid; // 子节点列表最后一次被修改的事务 ID,不包含子节点内容的修改 }

    版本-保证分布式数据原子性操作

    ZooKeeper 为数据节点引入了版本的概念,每个数据节点都具有三种类型的版本信息,对数据节点的任何变更都会引起版本号的变化。version 从 0 开始,表示“节点自创建后,其内容被更新过 0 次”,即使更新时内容本身没有变化,version 值也会发生变化。

    version 属性最大的用处是用来实现乐观锁机制中的“写入校验”,在 ZooKeeper 原生 API setData 方法里就有一个 version 参数。

    Watcher-数据变更通知

    一个典型的发布订阅系统能够让多个同时监听某一主题对象,当这个主题对象自身发生变化时,通知所有订阅者。ZooKeeper 中引入了 Watcher 机制来实现这种分布式的通知功能。ZooKeeper 允许客户端向服务端注册一个 Watcher 监听,当指定事件触发 Watcher 时,服务端会向客户端发送一个事件通知。

    ZooKeeper 的 Watcher 机制主要包括客户端线程、客户端 WatchManager 和 ZooKeeper 服务器三个部分。客户端在向 ZooKeeper 服务器注册 Watcher 的同时,会把 Watcher 存储在客户端的 WatchManager 中。当服务端触发 Watcher 事件后,会向客户端发送通知,客户端线程从 WatchManager 中取出对应的 Watcher 对象来执行回调逻辑。

    接口类 Watcher 用于表示一个标准的事件处理器,其定义了事件通知相关逻辑,包含 KeeperState 和 EventType 两个枚举类,分别代表了通知状态和事件类型。同时,也定义了回调方法 process。

    public interface Watcher { void process(WatchedEvent var1); public interface Event { public static enum EventType { None(-1), NodeCreated(1), // None 之外的事件类型,只在 KeeperState 为 SyncConnected 时出现 NodeDeleted(2), NodeDataChanged(3), NodeChildrenChanged(4); private final int intValue; private EventType(int intValue) { this.intValue = intValue; } } public static enum KeeperState { /** @deprecated */ @Deprecated Unknown(-1), Disconnected(0), /** @deprecated */ @Deprecated NoSyncConnected(1), SyncConnected(3), AuthFailed(4), ConnectedReadOnly(5), SaslAuthenticated(6), Expired(-112); private final int intValue; private KeeperState(int intValue) { this.intValue = intValue; } } } }

    AuthFailed 事件的触发条件并不简单是因为没有权限,一般是因为采用了错误的权限 Scheme。

    回调方法 process 只有一个类型为 WatchedEvent 的参数,该类有三个属性:状态、事件类型、节点路径。

    public class WatchedEvent { private final KeeperState keeperState; private final EventType eventType; private String path; }

    客户端在发送 Watch 信息给服务端时,并没有把 Watcher 对象发送过去,只是发送了 watcher 标志位置为了 true。服务端收到请求后,判断如果 watcher 为 true,就把代表客户端服务端连接的对象 ServerCnxn 和节点路径 Path 的映射关系放入 WatcherManager 中,也就是只是把需要 watcher 的连接存了起来。

    ACL-保障数据安全

    在 Unix 文件系统中,使用的是 UGO 权限控制机制,UGO 就是针对一个文件或目录,对创建者(User)、创建者所在的组(Group)、其他用户(Other)分别配置不同的权限。

    在 ZooKeeper 中使用的权限控制方式叫 ACL,即访问控制列表,是一种新颖的、更细粒度的权限控制方式。ACL 机制有三个概念:权限模式(Scheme)、授权对象(ID)、权限(Permission),通常使用“scheme?permission”来表示一个有效的 ACL 信息。

    权限模式用来确定权限验证中使用的检验策略,有以下四种:

    IP:支持精确的“IP:192.168.1.1”,也支持按网段配置"IP:192.168.1.1/24"。Digest:最常用,格式类似于"username:password"。World:最开放的权限模式,可以看做是特殊的 Digest 模式,权限标识固定为“world:anyone”。Super:也就是超级用户,也是类似特殊的 Digest 模式,在服务启动时,可以在启动参数里指定超级用户的权限标识。

    授权对象 ID 指的是权限赋予的用户或一个指定实体,例如 IP 地址或机器。在不同的权限模式下,授权对象是不同的,IP 权限模式下授权对象是 IP,其他权限模式下授权对象都是权限标识“username:password”。

    权限是指通过权限检查后运行执行的操作,在 ZooKeeper 中有五类:

    CREATE©:子节点的创建权限。DELETE(D):子节点的删除权限。READ®:当前节点的内容权限,和子节点列表权限。WRITE(W):数据节点的更新权限。ADMIN(A):数据节点的管理权限,允许授权对象对该数据节点进行 ACL 相关的设置操作。

    以下命令的含义是,权限模式为 Digest,把数据节点 /test 的全部权限 cdrwa 授予授权对象“wkp:NrLAZ6FuRnaPGI93r1uPKD67MLw=”。

    setAcl /test digest:wkp:NrLAZ6FuRnaPGI93r1uPKD67MLw=:cdrwa

    尽管 ZooKeeper 已经为我们提供了上述的四种权限模式,同时也提供给我们能够自定义自己权限的方式——实现接口 org.apache.zookeeper.server.auth.AuthenticationProvider,并在启动参数或配置文件里配置接口实现即可。

    - 启动参数配置 在ZooKeeper启动参数中配置类似于如下的系统属性 -Dzookeeper.authProvider.1=com.zkbook.CustomAuthenticationProvider - 配置文件配置 在zoo.cfg配置文件中配置类似于如下的配置项 authProvider.1=com.zkbook.CustomAuthenticationProvider

    序列化与协议

    序列化

    Jute 是 ZooKeeper 中的序列化组件,几乎没有其他组件使用,下面简单讲讲其序列化方法和原理。

    每一个要使用 Jute 序列化的类,都需要实现 Record 接口,其有两个方法:serialize 和 deserialize。下面是一个实现了 Record 接口的 Bean 示例:

    public interface Record { public void serialize(OutputArchive archive, String tag) throws IOException; public void deserialize(InputArchive archive, String tag) throws IOException; } public class TestBean implements Record { private int intV; private String stringV; public TestBean() { } public TestBean(int intV, String stringV) { this.intV = intV; this.stringV = stringV; } @Override public void deserialize(InputArchive archive, String tag) throws IOException { archive.startRecord(tag); this.intV = archive.readInt("intV"); this.stringV = archive.readString("stringV"); archive.endRecord(tag); } @Override public void serialize(OutputArchive archive, String tag) throws IOException { archive.startRecord(this, tag); archive.writeInt(intV, "intV"); archive.writeString(stringV, "stringV"); archive.endRecord(this, tag); } }

    对 TestBean 进行序列化和反序列化的方法是:

    public class BinaryTest1 { public static void main(String[] args) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); new TestBean(1, "testbean1").serialize(boa, "tag1"); byte array[] = baos.toByteArray(); ByteArrayInputStream bais = new ByteArrayInputStream(array); BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); TestBean newBean1 = new TestBean(); newBean1.deserialize(bia, "tag1"); System.out.println("intV = " + newBean1.getIntV() + ",stringV = " + newBean1.getStringV()); bais.close(); baos.close(); } }

    我们可以看到 TestBean 实现了 Record 的两个方法,其序列化主要是使用了 InputArchive 和 OutputArchive,它两有多种实现,最常用的是 BinaryOutputArchive 和 BinaryInputArchive。在上面的代码里,序列化时,便使用了 BinaryOutputArchive,看下面的源码可知,其底层其实是把数据输出到了 ByteArrayOutputStream。

    public class BinaryOutputArchive implements OutputArchive { private ByteBuffer bb = ByteBuffer.allocate(1024); private DataOutput out; public static BinaryOutputArchive getArchive(OutputStream strm) { return new BinaryOutputArchive(new DataOutputStream(strm)); } /** Creates a new instance of BinaryOutputArchive */ public BinaryOutputArchive(DataOutput out) { this.out = out; } public void writeByte(byte b, String tag) throws IOException { out.writeByte(b); } public void writeBool(boolean b, String tag) throws IOException { out.writeBoolean(b); } public void writeInt(int i, String tag) throws IOException { out.writeInt(i); } public void writeLong(long l, String tag) throws IOException { out.writeLong(l); } public void writeFloat(float f, String tag) throws IOException { out.writeFloat(f); } public void writeDouble(double d, String tag) throws IOException { out.writeDouble(d); } /** * create our own char encoder to utf8. This is faster * then string.getbytes(UTF8). * @param s the string to encode into utf8 * @return utf8 byte sequence. */ final private ByteBuffer stringToByteBuffer(CharSequence s) { bb.clear(); final int len = s.length(); for (int i = 0; i < len; i++) { if (bb.remaining() < 3) { ByteBuffer n = ByteBuffer.allocate(bb.capacity() << 1); bb.flip(); n.put(bb); bb = n; } char c = s.charAt(i); if (c < 0x80) { bb.put((byte) c); } else if (c < 0x800) { bb.put((byte) (0xc0 | (c >> 6))); bb.put((byte) (0x80 | (c & 0x3f))); } else { bb.put((byte) (0xe0 | (c >> 12))); bb.put((byte) (0x80 | ((c >> 6) & 0x3f))); bb.put((byte) (0x80 | (c & 0x3f))); } } bb.flip(); return bb; } public void writeString(String s, String tag) throws IOException { if (s == null) { writeInt(-1, "len"); return; } ByteBuffer bb = stringToByteBuffer(s); writeInt(bb.remaining(), "len"); out.write(bb.array(), bb.position(), bb.limit()); } public void writeBuffer(byte barr[], String tag) throws IOException { if (barr == null) { out.writeInt(-1); return; } out.writeInt(barr.length); out.write(barr); } public void writeRecord(Record r, String tag) throws IOException { r.serialize(this, tag); } public void startRecord(Record r, String tag) throws IOException {} public void endRecord(Record r, String tag) throws IOException {} }

    BinaryInputArchive 也是类似的,就不细讲了。

    通信协议

    基于 TCP/IP 协议,Zookeeper 实现了自己的通信协议来玩按成客户端与服务端、服务端与服务端之间的网络通信,对于请求,主要包含请求头和请求体,对于响应,主要包含响应头和响应体。

    对于请求协议而言,如下为获取节点数据请求的完整协议定义:

    图里的 bit offset,其实应该是字节 byte offset,因为 4 个 bit 位肯定不能完全表达数据长度。其中 xid 用于记录客户端请求发起的先后序号,用来确保单个客户端请求的响应顺序,type 代表请求的操作类型,如创建节点(OpCode.create)、删除节点(OpCode.delete)、获取节点数据(OpCode.getData)。

    协议的请求主体内容部分,包含了请求的所有操作内容,不同的请求类型请求体不同。对于会话创建而言,其请求体如下:

    class ConnectRequest { int protocolVersion; long lastZxidSeen; int timeOut; long sessionId; buffer passwd; }

    Zookeeper 客户端和服务器在创建会话时,会发送 ConnectRequest 请求,该请求包含协议版本号 protocolVersion、最近一次接收到服务器 ZXID lastZxidSeen、会话超时时间 timeOut、会话标识 sessionId 和会话密码 passwd。

    对于响应协议而言,如下为获取节点数据响应的完整协议定义:

    xid 与请求头中的 xid 一致,zxid 表示 Zookeeper 服务器上当前最新的事务 ID,err 则是一个错误码,表示当请求处理过程出现异常情况时,就会在错误码中标识出来,常见的包括处理成功(Code.OK)、节点不存在(Code.NONODE)、没有权限(Code.NOAUTH)。

    协议的响应主体内容部分,包含了响应的所有数据,不同的响应类型请求体不同。对于会话创建而言,其响应体如下:

    class ConnectResponse { int protocolVersion; int timeOut; long sessionId; buffer passwd; }

    针对客户端的会话创建请求,服务端会返回客户端一个 ConnectResponse 响应,该响应体包含了版本号 protocolVersion、会话的超时时间 timeOut、会话标识 sessionId 和会话密码 passwd。

    客户端

    客户端是开发人员使用 ZooKeeper 最主要的途径,主要由以下几个核心组件构成:

    ZooKeeper 实例:客户端的入口。ClientWatchManager:客户端 Watcher 管理器。HostProvider:客户端地址列表管理器。ClientCnxn:客户端核心线程,其内部包含两个线程:SendThread 和 EventThread。前者是一个 I/O 线程,负责 ZooKeeper 客户端和服务器之间的网络 I/O 通信;后者是一个事件线程,主要负责对服务端事件进行处理。

    一次会话的创建过程

    初始化阶段

    初始化Zookeeper对象。调用Zookeeper的构造方法来实例化一个Zookeeper,在初始化过程中,会创建一个客户端的Watcher管理器:ClientWatchManager。设置会话默认Watcher。如果在构造方法中传入一个Watcher对象,那么客户端会将这个对象作为默认Watcher保存在ClientWatchManager。构造Zookeeper服务器地址列表管理器:HostProvider。在构造方法中传入的服务器地址,客户端会将其存放在服务器地址列表管理器HostProvider中。创建并初始化客户端网络连接器:ClientCnxn。Zookeeper客户端首先会创建一个网络连接器ClientCnxn。用来管理客户端与服务器的网络交互。另外,客户端在创建ClientCnxn的同时,还会初始化客户端两个核心队列outgoingQueue和pendingQueue,分别作为客户端的请求发送队列和服务器端响应的等待队列。初始化SendThread和EventThread。客户端会创建两个核心网络线程SendThread和EventThread,前者用于管理客户端和服务端之间的所有网络I/O,后者则用于进行客户端的事件处理。同时,客户端还会将ClientCnxnSocket分配给SendThread作为底层网络I/O处理器,并初始化EventThread的待处理事件队列waitingEvents,用于存放所有等待被客户端处理的事情。

    会话创建阶段

    启动SendThread和EventThread。SendThread首先会判断当前客户端的状态,进行一系列请理性工作,为客户端发送“会话创建”请求做准备。获取一个服务器地址。在开始创建TCP之前,SendThread首先需要获取一个Zookeeper服务器的目标地址, 这通常是从HostProvider中随机获取出一个地址,然后委托给ClientCnxnSocket去创建与Zookeeper服务器之间的TCP连接。创建TCP连接。获取一个服务器地址后,ClientCnxnSocket负责和服务器创建一个TCP长连接。构造ConnectRequest请求。在TCP连接创建完毕后,可能有的读者会认为,这样是否就说明已经和Zookeeper服务器完成连接了呢?其实不然,上面的步骤只是纯粹地从网络TCP层完成了客户端与服务端之间的Socket连接,但远未完成Zookeeper客户端的会话创建。SendThread会负责根据当前客户端的实际设置,构造出一个ConnectRequest请求,该请求代表了客户端试图与服务端创建一个会话。同时,Zookeeper客户端还会进一步将该请求包装成网络I/O层的Packet对象,放入发送队列outgoingQueue中去。发送请求。当客户端请求准备完毕后,就可以开始向服务端发送请求了。ClientCnxnSocket负责从outgoingQueue中取出一个待发送的Packet对象,将其序列化成ByteBuffer后,向服务端进行发送。

    响应处理阶段

    接受服务器端响应。ClientCnxnSocket接受到服务端响应后,会首先判断当前的客户端状态是否是“已初始化”,如果尚未完成初始化,那么就认为该响应一定是会话创建请求的响应,直接交由readConnectResult方法来处理该响应。处理Response。ClientCnxnSocket会对接受到的服务端响应进行反序列化,得到ConnectResponse对象,并从中获取到Zookeeper服务端分配的会话SessionId。连接成功。连接成功后,一方面需要通知SendThread线程,进一步对客户端进行会话参数的设置,包括readTimeout和connectTimeout等,并更新客户端状态,另一方面,需要通知地址管理器HostProvider当前成功连接的服务器地址。生成时间:SyncConnected-None。为了能够让上层应用感知到会话的成功创建,SendThread会生成一个事件SyncConnected-None,代表客户端与服务器会话创建成功,并将该事件传递给EventThread线程。查询Watcher。EventThread线程收到事件后,会从ClientWatchManager管理器中查询出对应的Watcher,针对SyncConnected-None事件,那么就直接找出存储的默认Watcher,然后将其放到EventThread的watingEvents队列中去。处理事件。EventThread不断的从watingEvents队列中取出待处理的Watcher对象,然后直接调用该对象的process接口方法,以达到触发Watcher的目的。

    服务器地址列表

    在使用ZooKeeper构造方法时,用户传入的ZooKeeper服务器地址列表,即connectString参数,通常是这样一个使用英文状态逗号分隔的多个IP地址和端口的字符串:

    192.168.0.1:2181,192.168.0.1:2181,192.168.0.1:2181

    在 3.2.0 之后的新版本客户端里,也增加了命名空间特性。如果一个 ZooKeeper 客户端配置了 Chroot,那么客户端对服务器的任何操作,都将会被限制在自己的命名空间下。

    192.168.0.1:2181,192.168.0.1:2181,192.168.0.1:2181/namespace

    ZooKeeper客户端允许我们将服务器的所有地址都配置在一个字符串上,于是一个问题就来了:ZooKeeper客户端在连接服务器的过程中,是如何从这个服务器列表中选择服务器机器的呢?是按序访问,还是随机访问呢?

    在 ZooKeeper 中,是通过 HostProvider 来提供地址管理的,有个 next 方法用于获取下一个要访问的地址。其默认实现是 StaticHostProvider,next 方法的逻辑是先将地址随机打乱,组成一个循环队列,然后一直按序访问。

    public interface HostProvider { public int size(); public InetSocketAddress next(long spinDelay); public void onConnected(); }

    默认实现的 StaticHostProvider 逻辑比较简单,可以考虑对其进行扩展:

    动态变更的地址列表:ZooKeeper 服务器集群的整体迁移或个别变更,会导致大量客户端应用也跟着一起变更。出现这个尴尬局面的本质原因是因为我们把可能动态变更的地址列表写死在程序中了,因此实现动态变更的地址列表管理,对于提升 ZooKeeper 客户端使用体验非常友好。同机房优先策略:对于大规模的分布式系统,跨机房延迟会严重影响系统性能,实现同机房优先策略很有必要。

    ClientCnxn:网络 I/O

    ClientCnxn 是 ZooKeeper 客户端的核心工作类,负责维护客户端和服务端之间的网络连接,并进行一系列通信。ClientCnxn 有下面这些属性:

    ClientCnxn 中有两个队列,分别是代表客户端的请求发送队列 outgoingQueue 和服务端响应的等待队列 pendingQueue。负责进行命名空间隔离的 chrootPath。管理客户端和服务端所有网络 I/O 操作的 sendThread。负责客户端所有事件处理,触发 watcher 和异步 callback 的 eventThread。 // final String chrootPath; final SendThread sendThread; final EventThread eventThread; private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>(); private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();

    可以看到队列里元素类型是 Packet,Packet 是对协议层的一个封装,看起来属性很多,但实际上只有 requestHeader、request、readOnly 会传递到服务端。

    static class Packet { RequestHeader requestHeader; // 请求头 ReplyHeader replyHeader; // 响应头 Record request; // 请求体 Record response; // 响应体 ByteBuffer bb; /** Client's view of the path (may differ due to chroot) **/ String clientPath; // 节点路径 /** Servers's view of the path (may differ due to chroot) **/ String serverPath; boolean finished; AsyncCallback cb; Object ctx; WatchRegistration watchRegistration; // 注册的 watcher public boolean readOnly; }

    会话

    会话是 ZooKeeper 中最重要的概念之一,客户端与服务端之间的任何交互操作都与会话息息相关。

    会话状态包括 CONNECTING、CONNECTED、RECONNECTING、RECONNECTED、CLOSED 等。

    Session是Zookeeper中的会话实体,代表了一个客户端会话,其包含了如下四个属性:

    sessionID。会话ID,唯一标识一个会话,每次客户端创建新的会话时,Zookeeper都会为其分配一个全局唯一的sessionID。TimeOut。会话超时时间,客户端在构造Zookeeper实例时,会配置sessionTimeout参数用于指定会话的超时时间,Zookeeper客户端向服务端发送这个超时时间后,服务端会根据自己的超时时间限制最终确定会话的超时时间。TickTime。下次会话超时时间点,为了便于Zookeeper对会话实行"分桶策略"管理,同时为了高效低耗地实现会话的超时检查与清理,Zookeeper会为每个会话标记一个下次会话超时时间点,其值大致等于当前时间加上TimeOut。isClosing。标记一个会话是否已经被关闭,当服务端检测到会话已经超时失效时,会将该会话的isClosing标记为"已关闭",这样就能确保不再处理来自该会话的心情求了。

    Zookeeper 为了保证请求会话的全局唯一性,在 SessionTracker 初始化时,调用 initializeNextSession 方法生成一个 sessionID,之后在 Zookeeper 运行过程中,会在该 sessionID 的基础上为每个会话进行分配,初始化算法如下:

    public static long initializeNextSession(long id) { long nextSid = 0; // 无符号右移8位使为了避免左移24后,再右移8位出现负数而无法通过高8位确定sid值 nextSid = (System.currentTimeMillis() << 24) >>> 8; nextSid = nextSid | (id << 56); return nextSid; }

    其中的 id 表示配置在 myid 文件中的值,通常是一个整数,如1、2、3。该算法的高 8 位确定了所在机器,后 56 位使用当前时间的毫秒表示进行随机。SessionTracker 是 Zookeeper 服务端的会话管理器,负责会话的创建、管理和清理等工作。

    Zookeeper的会话管理主要是通过SessionTracker来负责,其采用了分桶策略(将类似的会话放在同一区块中进行管理)进行管理,以便Zookeeper对会话进行不同区块的隔离处理以及同一区块的统一处理。

    Zookeeper将所有的会话都分配在不同的区块一种,分配的原则是每个会话的下次超时时间点(ExpirationTime)。ExpirationTime指该会话最近一次可能超时的时间点。同时,Zookeeper Leader服务器在运行过程中会定时地进行会话超时检查,时间间隔是ExpirationInterval,默认为tickTime的值,ExpirationTime的计算时间如下

      ExpirationTime = ((CurrentTime + SessionTimeOut) / ExpirationInterval + 1) * ExpirationInterval

    会了保持客户端会话的有效性,客户端会在会话超时时间过期范围内向服务端发送PING请求来保持会话的有效性(心跳检测)。同时,服务端需要不断地接收来自客户端的心跳检测,并且需要重新激活对应的客户端会话,这个重新激活过程称为TouchSession。

    当SessionTracker的会话超时线程检查出已经过期的会话后,就开始进行会话清理工作,大致可以分为如下七步。

    标记会话状态为已关闭。由于会话清理过程需要一段时间,为了保证在此期间不再处理来自该客户端的请求,SessionTracker会首先将该会话的isClosing标记为true,这样在会话清理期间接收到该客户端的心情求也无法继续处理了。发起会话关闭请求。为了使对该会话的关闭操作在整个服务端集群都生效,Zookeeper使用了提交会话关闭请求的方式,并立即交付给PreRequestProcessor进行处理。收集需要清理的临时节点。一旦某个会话失效后,那么和该会话相关的临时节点都需要被清理,因此,在清理之前,首先需要将服务器上所有和该会话相关的临时节点都整理出来。Zookeeper在内存数据库中会为每个会话都单独保存了一份由该会话维护的所有临时节点集合,在Zookeeper处理会话关闭请求之前,若正好有以下两类请求到达了服务端并正在处理中。 节点删除请求,删除的目标节点正好是上述临时节点中的一个。临时节点创建请求,创建的目标节点正好是上述临时节点中的一个。对于第一类请求,需要将所有请求对应的数据节点路径从当前临时节点列表中移出,以避免重复删除,对于第二类请求,需要将所有这些请求对应的数据节点路径添加到当前临时节点列表中,以删除这些即将被创建但是尚未保存到内存数据库中的临时节点。 添加节点删除事务变更。完成该会话相关的临时节点收集后,Zookeeper会逐个将这些临时节点转换成"节点删除"请求,并放入事务变更队列outstandingChanges中。删除临时节点。FinalRequestProcessor会触发内存数据库,删除该会话对应的所有临时节点。移除会话。完成节点删除后,需要将会话从SessionTracker中删除。关闭NIOServerCnxn。最后,从NIOServerCnxnFactory找到该会话对应的NIOServerCnxn,将其关闭。

    当客户端与服务端之间的网络连接断开时,Zookeeper 客户端会自动进行反复的重连,直到最终成功连接上 Zookeeper 集群中的一台机器。

    最新回复(0)