Zookeeper

    xiaoxiao2022-06-25  174

    讲了zookeeper的简单的介绍,以及环境搭建,还有zkClient的使用,基本上很简单,作为HelloWorld,今天继续往下走, 既然已经把简介和环境搭建完了,然后一会来说说配置,配置也没有什么说太多的,那我这里有一个配置的文档,发成PDF 的形式发给你了,就是有一个zookeeper的配置文件,配置文件基本上也就这几个,首先有一个tickTime,就是客户端和 服务器端维持心跳的一个时间间隔,他这个默认也是有一个时间的,你自己看看默认的配置,dataDir就是存储咱们数据的 地方,然后有一个clientPort,客户端的端口,还有一个initLimit,就是说你的客户端,你想连我的zookeeper,如果还没有 连上,就认为失败,总的长度你可以自己去指定,syncLimit,就是leader和follower之间的时间间隔,之前发消息的时间间隔, 他不能够超过=多少时间长度,就挂了,server.0,server.1,server.2,server.3,一共有四个参数需要注意一下,A表示是第几号 服务器,B是IP地址了,C是2888,2888他指的是什么啊,C是这个服务器与集群中的leader服务器交换信息的端口,2888是进行通信的 端口号,然后3888,后面的D,表示什么啊,D就表示一个选举了,一个leader如果挂了话,就需要另外的进行选举,就是一个新的leader, 就是3888,基本上配置很简单,你可以自己看一下,或者上网找点资料

    (一)Zookeeper基础知识、体系结构、数据模型 1 zookeeper是一个类似hdfs的树形文件结构,zookeeper可以用来保证数据在(zk)集群之间的数据的事务性一致、 2 zookeeper有watch事件,是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher 3 zookeeper有三个角色:Learner,Follower,Observer 4 zookeeper应用场景: 统一命名服务(Name Service) 配置管理(Configuration Management) 集群管理(Group Membership) 共享锁(Locks) 队列管理 (二) Zookeeper配置(搭建zookeeper服务器集群) 1.1 结构:一共三个节点 (zk服务器集群规模不小于3个节点),要求服务器之间系统时间保持一致。 1.2 上传zk 进行解压: tar zookeeper-3.4.5.tar.gz 重命名: mv zookeeper-3.4.5 zookeeper 修改环境变量: vi /etc/profile export ZOOKEEPER_HOME=/usr/local/zookeeper export PATH=.:$HADOOP_HOME/bin:$ZOOKEEPER_HOME/bin:$JAVA_HOME/... 刷新: source /etc/profile 到zookeeper下修改配置文件 cd /usr/local/zookeeper/conf mv zoo_sample.cfg zoo.cfg 修改conf: vi zoo.cfg 修改两处 (1)dataDir=/usr/local/zookeeper/data (2)最后面添加 server.0=bhz:2888:3888 server.1=hadoop1:2888:3888 server.2=hadoop2:2888:3888 服务器标识配置: 创建文件夹:mkdir data 创建文件myid并填写内容为0:vi myid (内容为服务器标识 : 0) 进行复制zookeeper目录到hadoop01和hadoop02 还有/etc/profile文件 把hadoop01、hadoop02中的myid文件里的值修改为1和2 路径(vi /usr/local/zookeeper/data/myid) 启动zookeeper: 路径:/usr/local/zookeeper/bin 执行:zkServer.sh start (注意这里3台机器都要进行启动) 状态:zkServer.sh status(在三个节点上检验zk的mode,一个leader和俩个follower) 1.3 操作zookeeper (shell) zkCli.sh 进入zookeeper客户端 根据提示命令进行操作: 查找:ls / ls /zookeeper 创建并赋值:create /bhz hadoop 获取:get /bhz 设值:set /bhz baihezhuo 可以看到zookeeper集群的数据一致性 创建节点有俩种类型:短暂(ephemeral) 持久(persistent) (三) zoo.cfg详解: tickTime: 基本事件单元,以毫秒为单位。这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔, 也就是每隔 tickTime时间就会发送一个心跳。 dataDir: 存储内存中数据库快照的位置,顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。 clientPort: 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。 initLimit: 这个配置项是用来配置 Zookeeper 接受客户端初始化连接时最长能忍受多少个心跳时间间隔数, 当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息, 那么表明这个客户端连接失败。总的时间长度就是 10*2000=20 秒。 syncLimit: 这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度, 最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 5*2000=10 秒 server.A = B:C:D : A表示这个是第几号服务器, B 是这个服务器的 ip 地址; C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口; D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader 咱们继续往下走吧,121,122,123,这三个节点启起来以后,然后整体的zkServer.sh start,回车,三个节点我都要提供zookeeper, 然后咱们看一下状态,然后看一下三个节点的状态,可能是启动的并不是那么快,尤其每个节点每台机器只有一个G的内存,内存到是 没有什么变化,还没起来,咱们再来一次,这回起来了

    有的时候你等待的时间比较长,你下次再status的时候再起来了

    这回123是一个leader,然后21和22是一个follower,主从吗,zookeeper起来了以后,cd到/usr/local下,我们可以通过 eclipse来看一下,还是原始默认的情况

    然后咱们启动zookeeper以后呢,之前把shell和环境搭建,刚才其实讲了一下配置,在这里其实都已经写好了

    你可以自己去看一看配置文件, vim /usr/local/zookeeper/conf/zoo.cfg,咱们去看一下,他这里都是有默认时间的, tickTime=2000这个2000是2000毫秒,服务器端和客户端的心跳,还有initLimit=10,都有的,然后还有同步的时间, syncLimit=5,这个更短了,这个是5毫秒,还有其他的,dataDir,clientPort=2181,这个是对外提供服务的一个端口, zookeeper默认的端口就是2181,对外提供的端口,内部提供的端口,一个是做内部通信的,就是121,122,123三者通信就用2888, 如果leader挂了的话,就用3888去做,其实这个配置非常简单

    咱们再往下去走,去看

    如果我想用JAVA去操作zookeeper,你用什么版本都行,咱们现在用的是3.4.5,其实zookeeper有一套原生的API, 然后有一个zkClient,zkClient是在原生API基础上进行扩展的开源JAVA客户端,但是原生有一个API,扩展有第三方的API, 最好用的是Curator API,一点点说吧,首先简单看看JAVA代码吧,我现在用的是3.4.5,直接打开eclipse,咱们这个项目 其实已经准备好了

    现在我就用zookeeper-3.4.5.jar这一个就够了,然后包括zkclient的API,curator的API,假如你的项目现在引入了3.4.5, 然后怎么去用,我这里有这么多demo,一起来看看,首先看一下base,zookeeper下的base

    首先zookeeper实例化这个,这里的构造方法参数肯定都不同,有好多种方式,你会发现这个只是一个比较简单的API, 其实这块参数还有很多,canBeReadOnly只读,然后还有些SessionId,还有些sessionPassword,就是做认证的,还有很多东西, 有不同的构造方法,提供了4种构造方法,他提供了4种构造方法,参数不同,首先connectString这个就不说了,心跳检测的 超时时间,事件处理器,canBeReadOnly当前会话是否只读,sessionId和sessionPasswd,这块其实用的不是很多,就是你这个 zookeeper需要一个sessionId和password,然后用zookeeper连的时候需要做一个认证,然后可以提供一个重复对话,我就 可以用两个client端进行操作,两个client同时可以去登,保证他们两个操作的数据是可以共享的,怎么理解呢,就是两个 客户端共享一个session,他们两个操作是共享的,反正他也提供了这个写法了,我一般用的比较少,然后zookeeper客户端 和服务端会话的建立是一个异步的过程,我们的程序在处理连接的时候,可能是立即返回,连接成功就是咱们看到的那个, synchronized这个状态的时候,才算是真正的连接成功,所以因为异步的创建会话的问题,咱们就得使用CountDownLatch 一个协调了,就是刚才说的,然后继续往下看,创建节点一般是znode,这个方法就是非常简单,create,他提供了两个API, 同步创建,还有一种是异步创建,其实这个API我只是想简单讲一下,我不想讲的太深入,你工作中一般不会用到这套API, 不会用到zkClient,都会用到curator,因为太难用了,这套API

    咱们看一下吧,这是一些参数,直接看代码比较直观,临时节点应该是有一定的时间的,咱们先不说为什么临时节点创建完了 就直接删除了,咱们一会再看吧,API也没有说临时节点创建多久,反正刚才咱们已经看到了,他既然是有返回值,既然run as java 有返回值,zookeeper一定是三台机器,三个节点同步的,数据是同步的,两个follower,一个master,比如说咱们举个例子, 咱们有一个client端C1,再来一个client端C2,咱们知道zk是遵循原子算法的,它是单一视图的,3个节点上的数据是保持一致的, 比如这是根节点/,根节点下有一个/zookeeper节点,然后有一个/testRoot这个节点,然后/testRoot节点下还有个节点 就是/children,现在我做的就是临时的C1,可能开始没有/children,开始没有/children这个节点, 然后我的这个C1客户端,我怎么去实现分布式锁啊,那我就这么去做,当我C1去做一个操作的时候, 也就在某一个节点下,在testRoot这个节点下,建立一个children,建立一个临时的, 临时的节点,然后你其他的数据,总之他创建临时节点是为了干什么,就是为了占领一把锁,然后他就去做修改数据了,我去做一些 其他的数据库操作啊,或者做什么操作也好,做完了这个操作以后,然后再把这个children移除,做完操作什么意思,就是zk已经 close掉了,close掉就相当于结束了,结束了之后呢,这个C2他想操作,才能操作,我快速把这个问题说完,就是他用一个临时的节点 去做一个值,其实实现起来非常的easy,我写个伪代码你们就都懂了,我现在想做一个分布式的操作,咱们这么去理解吧,还是刚才 的zookeeper,这边有3个节点,然后我先做有2个client端,C1变成C2,然后他们想操作同一个数据库,比如同一个数据,或者两个数据库, 都无所谓了,就是说现在有一个问题,什么叫分布式呢,你整体的环境中有好几份应用,就是咱们要开发一个WEB程序,首先 我部署到C1上了,然后我又把同样的程序部署到上了,我生产环境中前端挂一个Nginx,可能我又挂了好几层Nginx, 这个就无所谓了,总之就实现负载均衡了,一会请求他,一会请求他,这个节点其实是我自己的应用,他肯定是一个物理机啊, 自己有对应的JDK,有自己的JVM,对应的有自己的JVM,这边C2也是一样的,他自己也有对应的JVM,你要理解这个事, 然后开发的时候是同一个代码部署到C1和C2上的,那么问题就来了,如果说我当前有一个用户,去访问里面同样的代码, 我数据库好多user,我要访问user,user原先的age年龄,比如25,然后我就要实现在同一个时间点,你不能同一个时间点去减值和 加值,如果并发量非常大的话,当然我这里是分布式数据库,如果是一个数据库无所谓,如果是两个数据库有数据同步的话, 第一个节点访问自己的db,第二个节点也访问自己的DB,然后两个DB是MYSQL的主从,或者ORACLE的做双击热备,这个都是可以的, 那就相当于我这个DB里面有一张user表,userId比如1,我这个userId也是1,我这个age,你随便找一个值吧,这个age原先是25, 这个数据也是25,用户肯定是操作界面了,一堆用户,并发的去访问Nginx,相当于负载在C1机器上,也负载在C2机器上了, 并发就会导致数据不一致的问题,可能就是有一种情况,有n多个请求来对25进行加或者减操作,有可能出现一种情况, 就是重复的加,你当前一个请求过来,看到25,对他进行++了,然后他就变成26了,你进行加加的过程中,可能执行逻辑还没有 加加,最后才加加呢,由于是两个JVM,数据就变成26了,就会有这个问题,可能加重了,要不然减少了,那咱们怎么做啊,是这样的, 我利用这个节点,client端一个请求过来的时候,我先用zookeeper去创建一个临时的节点,这个临时节点可以随意, 临时节点就写一个ID,这个ID就等于1,那这样的意思就是什么啊,当C2想对我数据库user进行操作的时候,C1操作也是, C1想对age加1,C1加1之前先不做其他事,他先去zookeeper上先去create一个node,就是一个临时的znode,它是1,这边也是 想做一个相同的操作,C2肯定也是去create一个znode,C2也想创建ID为1的,你会发现他创建不了,他就先get一遍, 首先是先get,看看zookeeper上有没有id这个节点,id等于1的这个节点,有的话我就不创建就等待,没有的话我就create一个id, 由于zookeeper支持同一个时间点只能有一个client端,并发再并发只能有一个client端去进行操作,不可能并发同时去修改 一个节点id,所以说呢,当我C1操作完了之后,自动会把znode给清空,我清空的时候别人一访问的时候,id为1的先get一下, 一看id为1的没有,怎么办,我就create出来,我create一个临时节点,就相当于一个保护的作用,像一把锁,你先去查一下zookeeper id等于1的数据别人用不用着,用着我就在这里等着,临时节点清空了,再去创建一个,加了一把锁一样,zookeeper原生的就是这么去 实现的,我这么讲的你理不理解,能理解这个意思吗,一个会话创建一个testNode,如果没有关闭的话,别的会话就创建不了了, 这肯定是建不了的,他就是那个机制,差不多他这个分布式锁就是这么做的 比如我现在有C1,C2两个客户端,C1客户端,这里是C2客户端,他们想同时操作数据库里的一张表,两个都想操作数据库,一个IP是81, 然后C2的IP是82,都要操作同一份user数据,ID等于1,都要操作这个,81和82都要操作同样的一条数据,会有一个什么情况呢, 先去做分布式锁的时候第一步是这样的,第一步首先get,比如说我这个节点就写成这样,get /usr/1,id直接写成1,或者string类型 的字符串, get /usr/stringuuid,这个你自己随便起,比如这个人的是ID:1001,user下面去挂一个1001,get /usr/1001, 在user这个节点下,有一个1001节点,在来一个1002节点,在来一个1003节点,就表示不同的数据,1001的数据全路径就是/user/1001, 这是一个全路径,它是key,value是一个数组,你自己随便去取,首先C1想去做并发操作的时候做分布式锁的时候,他首先第一步是 get,get /user/1001, zookeeper的时候看有没有这个节点,如果没有的话,那我就进行create,create一个什么啊,create一个 Node,`Node是一个临时节点类型,我就执行我的UPDATE操作了,update我这台81机器的持久化操作,然后我持久化操作完了之后呢, 我现在肯定不是释放,肯定是81和82进行数据同步,你要保证这两台节点的数据相等,里面有一个name属性,NAME属性,或者是COUNT, 计数器,COUNT=5,我这边并发一旦大了,COUNT就改成6了,改成6了之后,现在COUNT变成6了,6一定要和82服务器同步,要不然你持久化 操作是这种机制,如果同步不成功的话,就返回持久化失败,抛出异常或者怎么怎么样了,然后就是把zk这个节点释放,你自己去想吧, 总之加入正常的情况下,C2这个节点也变成6了,假设我最后做完这个事之后,我要调zk.close(),我这个一close的话, 当前这个会话就结束了,当前这个会话一结束,临时节点就已经消失了,其他人如果想访问1001的话,就会创建一个临时节点了, C1比如他在执行UPDATE的过程中,C2能去操作这条记录吗,你得去看一下,你得去看 get /user/1001返回的是一个路径还是一个 null值,如果他get之后,返回的是一个null空值的话,那怎么办啊,就表示是有其他的节点在操作的,那你就等着,什么时候那边完成了, 你这边才能做这个工作,大体上就是这个事,能理解我说的意思吧,这样的,你持久化节点的话,zookeeper为什么要提供temp节点, 临时的节点呢,主要目的就是这个事,持久化的话效率可能会低一些,临时节点删除和新增性能好一点,你说的是什么问题, get不到返回空你就创建,什么是get不到,get不到我肯定就是create了,两个都get不到,同时创建就抛异常了吧,两个都get不到, 不存在同时创建的问题,你要知道zookeeper的性能是很好的,就是说你要知道,就是这个集群中,同一个时间点只能有一个人去 这里找,不可能是一个时间点两个请求,精确到毫秒级的,如果他get不到的话,那他直接就create了,create之后, 会有一个id产生,这边他去做的时候,他去创建的时候,你最后创建成功了,你要从代码的角度去讲,即使是有网络的原因, 可能情况是这样的,这边网络有点慢,去zookeeper去取的时候,没有id为1001的节点,结果创建成功了,就是创建的时候, C2在get的时候,发现也没有这个ID,他也去创建了,他创建可能也需要很长的时间,但是你最后创建成功了,结果肯定只有一个, 创建成功了就马上同步上去了,当你C2节点又再创建的时候,我就发现已经有了,刚才我已经看到了,重复去创建的话, 就抛异常了,肯定就抛异常了,抛异常那就好办了,就证明我这个节点已经存在了,不可能存在同一个时间点存在两份的情况, 如果要是这么说的话,数据库并发量大的话,内部肯定是需要做原子消息广播加锁的,不需要考虑这些问题,不可能同一个时间节点 创建两份数据,基本上如果是这样的话,那这个zookeeper就没法用了,即使你两个client去get的时候有时间差,这个可以理解, 但是你最后创建节点的时候,一个先一个后,不可能第一个请求是0.00001秒,他先创建成功了,第二个节点的时间肯定会差一点点, 比如晚一点点,0.00002秒,他创建的时候,直接到zookeeper上去创建,之前已经有了一个创建成功了,报异常就回滚,失败, 证明有人去创建了的,能理解我说的意思吧,get的话网络的延迟,他们两个不是一个网络,第一个get的时候发现没有, 第二个get的时候也发现没有,那我就去创建节点,只能创建一个,再创建一个的时候就抛异常了,这块怎么去实现分布式锁, zookeeper内部的原理,简单介绍一下 package com.learn.zookeeper.base; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; /** * 找一个就是一个helloworld程式了 * 我打开看一下 * 基本上就是一个最简单的配置了 * * @author Leon.Sun * */ public class ZookeeperBase { /** zookeeper地址 */ /** * 集群是用逗号分隔的 * MQ是分号 * 这里是逗号分隔的 * 当然其实无所谓 * 你写一个就可以了 * 这是咱们的连接串 */ static final String CONNECT_ADDR = "59.110.138.145:2181"; // static final String CONNECT_ADDR = "192.168.80.88:2181,192.168.80.87:2181,192.168.80.86:2181"; /** session超时时间 */ /** * 这里有一个超时时间 * 其实你在zoo.cnf * 配置了一堆东西 * 其实你在客户端连的时候你可以自己配 * 这个就是一个SESSION超时的时间 * client和zookeeper通信保持的时间 * 选择的是5秒就是5000毫秒 * 5秒钟如果还连不上zookeeper * 我就认为超时了 * */ static final int SESSION_OUTTIME = 2000;//ms /** 信号量,阻塞程序执行,用于等待zookeeper连接成功,发送成功信号 */ /** * 这个还有印象吗 * CountDownLatch这个东西 * 他能干什么事呢 * 它是一个等待 * 一个等待的效果 * 比如你那里有一个线程 * 咱们看一下这个场景你就明白了 * 我现在就用到了CountDownLatch * 总之我把这个实例化出来了 * 配置的是1 * 1个等待就足够了 * * 为什么要在这里加一个这个东西呢 * 其实原理很简单 * 因为咱们的程序连zookeeper是一个异步的 * 你比如我这里有一个client端 * 这个client端就是咱们写的代码 * 然后咱们在这里new出一个zookeeper实例 * zk实例我new出来以后 * 我就回调watcher * 回调watcher他的代码会一直玩下走 * 走到new Watcher()的时候 * 他去做了一个真正的connect * 就是我代码会一直往下走的 * 我连接我zookeeper集群是异步的 * 可能我连这个花1,2秒 * 连的时候稍微有点长 * 我的代码就直接往下走了 * zookeeper实例还没有真正创建好了 * 我连接可能还没有连接好 * 这个代码已经执行其它代码了 * 比如我要create一个节点 * 创建一个节点 * 那就不行了 * 因为你这个zk实例还没有真正的连接上 * 以后肯定是通过一个zk来create一个node * 这个zk如果没有连接上的时候 * 就会发生报空指针异常 * 本身这对象啥也没有 * 你点create肯定会报异常 * 会有这个问题 * 所以我现在就用这种方式去解决了 * * */ static final CountDownLatch connectedSemaphore = new CountDownLatch(1); /** * 看一下主函数main * * @param args * @throws Exception */ public static void main(String[] args) throws Exception{ /** * 你在这里看一下zk * org.apache.zookeeper.ZooKeeper * 现在用的是apache的zookeeper * 是我引入的zookeeper-3.4.5这个包 * 这个类就相当于client端实例了 * 实例化new一个就足够了 * 传递的什么东西呢 * 几个参数 * ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) * connectString连接串 * 第二个是int类型的sessionTimeout * 就是session超时的时间 * 第三个东西叫做Watcher * 发现这个是新接触的名词 * 他就相当于一个观察者 * 最开始介绍角色的时候 * zookeeper这个环境中一共有三种角色 * 第一个就是leader * 然后是follower * 这个就不用说了 * 第三个就是Observer * Observer是一个特殊的follower * 你也可以把Observer当做一个Watcher * 可以理解为观察者 * 其实就是咱们的client端 * 然后他需要一个Watcher * 我就把它实例化出来 * new一个Watcher * 它是一个接口 * 你把它实例化 * 要重新写一个方法 * 就是process方法 * 那Watcher new出来的时候你会发现 * override一个process方法 */ ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher(){ /** * 里面传递一个参数 * 一个WatchedEvent * 监控的事件 */ @Override public void process(WatchedEvent event) { /** * 下面有一个KeeperState * 用event.getState能获得KeeperState * 获得keeperState * 获取事件的状态 * * 通过event get这个状态 * */ KeeperState keeperState = event.getState(); /** * event能够getState * 也能getType * 事件的类型和时间的状态 * 获取事件的类型 */ EventType eventType = event.getType(); //如果是建立连接 /** * 建立连接这个代码都是很固定的 * 事件都有什么状态啊 * KeeperState.SyncConnected就是连接成功的状态 * 看一下有什么状态 * 有这么多种状态吧 * Disconnected连接失败 * Expired过期 * NoSyncConnected没有连接上 * UnKnow也是连接失败的 * 等等一些状态吧 * 这个事件的状态 * SyncConnected就是连接成功的状态 * KeeperState.SyncConnected这个是一个值 * 这个值是一个死的 * 就是连接成功比如OK * * 如果这个状态和keeperState想等的话 * 那就表示连接成功了 */ if(KeeperState.SyncConnected == keeperState){ /** * 连接成功我再做 * 再做一次判断 * EventType事件的类型 * 如果是None是连接开始 * 刚连接成功是啥事件也没有 * 所以就是None * 如果None就等于getType的话 * 那就表示连接成功 * 然后这里面再看一眼 * 他有好多种事件的类型 * 事件的类型和状态是两码事 * 一会详细去说 * 事件类型有好多 * NodeChildChanged我子节点发生变更 * 还有NodeCreated * 还有NodeDataChanged数据变化 * NodeDeleted * None什么也没做 * 他里面提供了5种状态 * 刚连接的时候是啥也没干 * None这种状态 * 那就表示连接成功了 * 连接成功了做什么事了 * */ if(EventType.None == eventType){ /** * 假如你要模拟连接时间很长 */ try { /** * 我等待2秒 * 然后再countDown */ System.out.println("开始连接....."); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } /** * 然后咱们的zk已经建立连接了 * * 什么时候zk真正连接成功了 * 这句话打印了 * 满足EventType.SyncConnected == eventType这个条件和 * KeeperState.SyncConnected == keeperState这个条件了 * SyncConnected和SyncConnected我就打印这句话 * 把countDown去放行 * 然后继续往下走 * 就是这个机制 * 能理解我说的意思吗 * 想一想是不是这么回事 * 差不多就是这样的 * 这是最原始最古老的写法 * 用的是原生的API * 只有这种方式 * * 然后他才执行 * 咱们试一下吧 * * 现在是他先执行 * 然后这边才这么去做 * * 这个代码是并行的 * 这个代码一放行 * 这个代码就进行往下走 * */ System.out.println("zk 建立连接"); //如果建立连接成功,则发送信号量,让后续阻塞程序向下执行 /** * 我做了一件事情 * connectedSemaphore.countDown() * 继续往下走 * * 我这里countDown了 * * 你要理解countDown到底是什么含义就够了 * */ connectedSemaphore.countDown(); // try { // /** // * 我在这里sleep2秒 // * 基本上是这样的 // * 他在连的时候等3秒 // * 3秒之后咱们来看一下 // * 它会发生什么情况 // * 你想想会有什么情况 // * 在这里写sleep 2秒 // */ // Thread.sleep(2000); // } catch (InterruptedException e) { // e.printStackTrace(); // } } } } }); //进行阻塞 /** * 当代码执行到这里的时候 * 是阻塞的 * 程序就不往下走了 * 就停在这里 */ connectedSemaphore.await(); /** * 结果是这个先执行 * 因为这里是主线程 * 主线程停了其他的就停了 * 我主线程要是停了的话 * 相当于JAVA代码全都停止了 * 其他线程也就不好使了 * 里面的线程都得给我停掉 * 能理解这个意思吧 * 其实什么意思 * 你会发现 * */ System.out.println("执行啦.."); /** * 所以这里sleep一下 * sleep时间长就是5秒吧 */ Thread.sleep(5000); //创建父节点 /** * 这里把包导进来 * * 假如我现在执行的代码是这样的 * create(String path, byte[] data, List<ACL> acl, CreateMode createMode) * create方法里面传了4个参数 * 第一个就是path * path什么意思呢 * 你要创建节点节点的路径是什么 * 第二个就是一个byte类型的data * 就对应path的value * 就好像/testRoot这个是key * "testRoot".getBytes()这个是value * 然后后面是一个认证 * 一个ACL认证 * 其实这个无所谓 * 就是OPEN_ACL_UNSAFE * 就是一个安全的东西 * 一个静态的Ids * 咱们一般都选择这项 * 其实还有其他的 * 这个你不用担心 * 一般来说都会选择OPEN_ACL_UNSAFE * 然后还有一个CreateMode * 你创建节点的模式是什么 * 创建模式有很多种 * 有4种 * 首先我创建的是PERSISTENT * 持久化的 * 有4种方式 * 持久化的普通节点 * 还有持久化的顺序节点 * PERSISTENT SEQUENTIAL * 还有一个临时节点 * EPHEMERAL * 还有临时的顺序节点 * 反正就是这4种方式 * 提供给创建人员 * 我现在使用的是持久化方式的 * 然后我去做这种事情 * * 咱们看一下API行不行 * 咱们直接登录zkCli.sh就可以了 * 然后ls / * 目前根节点下只有一个zookeeper * 我先做是要在根节点下创建一个testRoot * 建立完连接以后呢 * 也没有返回值 * String org.apache.zookeeper.ZooKeeper.create() * 有一个String * 应该是节点的信息吧 * 现在我有一个问题产生了 * 刚才其实我已经创建完节点了 * 我可以看一下 * ls / * 下面一定是有一个testRoot这个节点的 * [zookeeper, testRoot] * 我要是想获得就使用get /testRoot * 我们能取得testRoot的值testRoot * 也叫testRoot * 就是我再去create一次 * 我现在再去创建一次 * 同样的节点/testRoot * 值发生变化"testRoot11" * 现在能创建还是不能创建啊 * 你想一想能不能创建 * 就是这个节点已经有了 * 我现在还想创建还能不能创建 * 你想一想猜一猜 * KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /testRoot * NodeExists有了一个节点已经重复了 * 所以有节点就不能创建了 * 这样的话有节点就不能创建了 * 咱们就删掉 * delete /testRoot * 咱们再次去创建一下 * 这种PERSISTENT是持久化的方式 * 返回的内容你也看到了 * 就是/testRoot * 这就是创建节点 * 然后这个内存就变成了testRoot11 * 也可以看看 * 咱们get /testRoot * 变成testRoot11这个值了 * 当前testRoot已经有这个节点了 * */ // String ret = zk.create("/testRoot", "testRoot11".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // System.out.println(ret); //创建子节点 /** * 然后我想创建一个子节点该怎么去写啊 * 你必须要这么去写 * /testRoot/children * 你必须写testRoot下面的children * 就是这个children * 然后还有一个问题就是说 * 咱们先看这个创建子节点吧 * 就是在/testRoot下面再加一个 * 写法就是这样的 * 然后整体的key的value值就是"children data".getBytes()这个 * 然后还是使用OPEN_ACL_UNSAFE这种方式 * 然后EPHEMERAL这块注意 * 这是一个临时节点 * 临时节点我记得是有一个时间的 * 他就会被清空 * 但是我忘了临时节点是多长时间了 * 你可以百度一下查一下 * 应该默认是一分钟还是多久我忘了 * ls /testRoot * [] * String org.apache.zookeeper.ZooKeeper.create * 返回一个String * 这个是一个临时的创建节点 * 在testRoot下创建一个临时节点 * */ // zk.create("/testRoot/children", "children data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); String ret = zk.create("/testRoot/children", "children data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); /** * /testRoot/children * 其实创建成功了 * 因为你是临时节点 * 直接就清空了 * 时间也太短了 * 直接过长我就忘记了 * 反正刚才咱们已经看到了,他既然是有返回值,既然run as 有返回值 * 这个节点马上又被删除掉了 * 他就是创建完了就直接停了 * 我想起来了 * 是这样的 * 本次会话有效 * 下次会话就没效了 * */ System.out.println(ret); /** * 咱们在这里sleep10秒 * 就是暂停10秒钟 * 这样去看就能看到效果了 * 这个children就一直还在 * 然后你等我这个程序停了以后 * 我这个child就没了 * 也就是说这个临时节点啊 * 保持这个zk本次会话有效 * 临时会话本次有效 * 他可以去做一个什么事啊 * 好多人去那他做一个分布式锁啊 * zookeeper去做一个分布式锁是为一个临时节点去创建的 * 但是这个就有点底层了 * 简单给大家就解释一下吧 * 它是怎么去做的呢 * 是这样的 * * */ Thread.sleep(10000); /** * 比如我现在有C1,C2两个客户端 * C1客户端 */ /** * 这里是C2客户端 * 他们想同时操作数据库里的一张表 */ //获取节点洗信息 // byte[] data = zk.getData("/testRoot", false, null); // System.out.println(new String(data)); // System.out.println(zk.getChildren("/testRoot", false)); //修改节点的值 // zk.setData("/testRoot", "modify data root".getBytes(), -1); // byte[] data = zk.getData("/testRoot", false, null); // System.out.println(new String(data)); //判断节点是否存在 // System.out.println(zk.exists("/testRoot/children", false)); //删除节点 // zk.delete("/testRoot/children", -1); // System.out.println(zk.exists("/testRoot/children", false)); /** * 做完了之后我就会去做zk的close * 你建立完连接之后呢要关闭 */ zk.close(); } }

     


    最新回复(0)