讲了zookeeper的简单的介绍,以及环境搭建,还有zkClient的使用,基本上很简单,作为HelloWorld,今天继续往下走,
既然已经把简介和环境搭建完了,然后一会来说说配置,配置也没有什么说太多的,那我这里有一个配置的文档,发成PDF
的形式发给你了,就是有一个zookeeper的配置文件,配置文件基本上也就这几个,首先有一个tickTime,就是客户端和
服务器端维持心跳的一个时间间隔,他这个默认也是有一个时间的,你自己看看默认的配置,dataDir就是存储咱们数据的
地方,然后有一个clientPort,客户端的端口,还有一个initLimit,就是说你的客户端,你想连我的zookeeper,如果还没有
连上,就认为失败,总的长度你可以自己去指定,syncLimit,就是leader和follower之间的时间间隔,之前发消息的时间间隔,
他不能够超过=多少时间长度,就挂了,server.0,server.1,server.2,server.3,一共有四个参数需要注意一下,A表示是第几号
服务器,B是IP地址了,C是2888,2888他指的是什么啊,C是这个服务器与集群中的leader服务器交换信息的端口,2888是进行通信的
端口号,然后3888,后面的D,表示什么啊,D就表示一个选举了,一个leader如果挂了话,就需要另外的进行选举,就是一个新的leader,
就是3888,基本上配置很简单,你可以自己看一下,或者上网找点资料
(一)Zookeeper基础知识、体系结构、数据模型
1 zookeeper是一个类似hdfs的树形文件结构,zookeeper可以用来保证数据在(zk)集群之间的数据的事务性一致、
2 zookeeper有watch事件,是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher
3 zookeeper有三个角色:Learner,Follower,Observer
4 zookeeper应用场景:
统一命名服务(Name Service)
配置管理(Configuration Management)
集群管理(Group Membership)
共享锁(Locks)
队列管理
(二) Zookeeper配置(搭建zookeeper服务器集群)
1.1 结构:一共三个节点 (zk服务器集群规模不小于3个节点),要求服务器之间系统时间保持一致。
1.2 上传zk
进行解压: tar zookeeper-3.4.5.tar.gz
重命名: mv zookeeper-3.4.5 zookeeper
修改环境变量: vi /etc/profile
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=.:$HADOOP_HOME/bin:$ZOOKEEPER_HOME/bin:$JAVA_HOME/...
刷新: source /etc/profile
到zookeeper下修改配置文件
cd /usr/local/zookeeper/conf
mv zoo_sample.cfg zoo.cfg
修改conf: vi zoo.cfg 修改两处
(1)dataDir=/usr/local/zookeeper/data
(2)最后面添加
server.0=bhz:2888:3888
server.1=hadoop1:2888:3888
server.2=hadoop2:2888:3888
服务器标识配置:
创建文件夹:mkdir data
创建文件myid并填写内容为0:vi myid (内容为服务器标识 : 0)
进行复制zookeeper目录到hadoop01和hadoop02 还有/etc/profile文件
把hadoop01、hadoop02中的myid文件里的值修改为1和2 路径(vi /usr/local/zookeeper/data/myid)
启动zookeeper:
路径:/usr/local/zookeeper/bin
执行:zkServer.sh start (注意这里3台机器都要进行启动)
状态:zkServer.sh status(在三个节点上检验zk的mode,一个leader和俩个follower)
1.3 操作zookeeper (shell)
zkCli.sh 进入zookeeper客户端
根据提示命令进行操作:
查找:ls / ls /zookeeper
创建并赋值:create /bhz hadoop
获取:get /bhz
设值:set /bhz baihezhuo
可以看到zookeeper集群的数据一致性
创建节点有俩种类型:短暂(ephemeral) 持久(persistent)
(三) zoo.cfg详解:
tickTime: 基本事件单元,以毫秒为单位。这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,
也就是每隔 tickTime时间就会发送一个心跳。
dataDir: 存储内存中数据库快照的位置,顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
clientPort: 这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
initLimit: 这个配置项是用来配置 Zookeeper 接受客户端初始化连接时最长能忍受多少个心跳时间间隔数,
当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,
那么表明这个客户端连接失败。总的时间长度就是 10*2000=20 秒。
syncLimit: 这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,
最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 5*2000=10 秒
server.A = B:C:D :
A表示这个是第几号服务器,
B 是这个服务器的 ip 地址;
C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;
D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader
咱们继续往下走吧,121,122,123,这三个节点启起来以后,然后整体的zkServer.sh start,回车,三个节点我都要提供zookeeper,
然后咱们看一下状态,然后看一下三个节点的状态,可能是启动的并不是那么快,尤其每个节点每台机器只有一个G的内存,内存到是
没有什么变化,还没起来,咱们再来一次,这回起来了
有的时候你等待的时间比较长,你下次再status的时候再起来了
这回123是一个leader,然后21和22是一个follower,主从吗,zookeeper起来了以后,cd到/usr/local下,我们可以通过
eclipse来看一下,还是原始默认的情况
然后咱们启动zookeeper以后呢,之前把shell和环境搭建,刚才其实讲了一下配置,在这里其实都已经写好了
你可以自己去看一看配置文件, vim /usr/local/zookeeper/conf/zoo.cfg,咱们去看一下,他这里都是有默认时间的,
tickTime=2000这个2000是2000毫秒,服务器端和客户端的心跳,还有initLimit=10,都有的,然后还有同步的时间,
syncLimit=5,这个更短了,这个是5毫秒,还有其他的,dataDir,clientPort=2181,这个是对外提供服务的一个端口,
zookeeper默认的端口就是2181,对外提供的端口,内部提供的端口,一个是做内部通信的,就是121,122,123三者通信就用2888,
如果leader挂了的话,就用3888去做,其实这个配置非常简单
咱们再往下去走,去看
如果我想用JAVA去操作zookeeper,你用什么版本都行,咱们现在用的是3.4.5,其实zookeeper有一套原生的API,
然后有一个zkClient,zkClient是在原生API基础上进行扩展的开源JAVA客户端,但是原生有一个API,扩展有第三方的API,
最好用的是Curator API,一点点说吧,首先简单看看JAVA代码吧,我现在用的是3.4.5,直接打开eclipse,咱们这个项目
其实已经准备好了
现在我就用zookeeper-3.4.5.jar这一个就够了,然后包括zkclient的API,curator的API,假如你的项目现在引入了3.4.5,
然后怎么去用,我这里有这么多demo,一起来看看,首先看一下base,zookeeper下的base
首先zookeeper实例化这个,这里的构造方法参数肯定都不同,有好多种方式,你会发现这个只是一个比较简单的API,
其实这块参数还有很多,canBeReadOnly只读,然后还有些SessionId,还有些sessionPassword,就是做认证的,还有很多东西,
有不同的构造方法,提供了4种构造方法,他提供了4种构造方法,参数不同,首先connectString这个就不说了,心跳检测的
超时时间,事件处理器,canBeReadOnly当前会话是否只读,sessionId和sessionPasswd,这块其实用的不是很多,就是你这个
zookeeper需要一个sessionId和password,然后用zookeeper连的时候需要做一个认证,然后可以提供一个重复对话,我就
可以用两个client端进行操作,两个client同时可以去登,保证他们两个操作的数据是可以共享的,怎么理解呢,就是两个
客户端共享一个session,他们两个操作是共享的,反正他也提供了这个写法了,我一般用的比较少,然后zookeeper客户端
和服务端会话的建立是一个异步的过程,我们的程序在处理连接的时候,可能是立即返回,连接成功就是咱们看到的那个,
synchronized这个状态的时候,才算是真正的连接成功,所以因为异步的创建会话的问题,咱们就得使用CountDownLatch
一个协调了,就是刚才说的,然后继续往下看,创建节点一般是znode,这个方法就是非常简单,create,他提供了两个API,
同步创建,还有一种是异步创建,其实这个API我只是想简单讲一下,我不想讲的太深入,你工作中一般不会用到这套API,
不会用到zkClient,都会用到curator,因为太难用了,这套API
咱们看一下吧,这是一些参数,直接看代码比较直观,临时节点应该是有一定的时间的,咱们先不说为什么临时节点创建完了
就直接删除了,咱们一会再看吧,API也没有说临时节点创建多久,反正刚才咱们已经看到了,他既然是有返回值,既然run as
java 有返回值,zookeeper一定是三台机器,三个节点同步的,数据是同步的,两个follower,一个master,比如说咱们举个例子,
咱们有一个client端C1,再来一个client端C2,咱们知道zk是遵循原子算法的,它是单一视图的,3个节点上的数据是保持一致的,
比如这是根节点/,根节点下有一个/zookeeper节点,然后有一个/testRoot这个节点,然后/testRoot节点下还有个节点
就是/children,现在我做的就是临时的C1,可能开始没有/children,开始没有/children这个节点,
然后我的这个C1客户端,我怎么去实现分布式锁啊,那我就这么去做,当我C1去做一个操作的时候,
也就在某一个节点下,在testRoot这个节点下,建立一个children,建立一个临时的,
临时的节点,然后你其他的数据,总之他创建临时节点是为了干什么,就是为了占领一把锁,然后他就去做修改数据了,我去做一些
其他的数据库操作啊,或者做什么操作也好,做完了这个操作以后,然后再把这个children移除,做完操作什么意思,就是zk已经
close掉了,close掉就相当于结束了,结束了之后呢,这个C2他想操作,才能操作,我快速把这个问题说完,就是他用一个临时的节点
去做一个值,其实实现起来非常的easy,我写个伪代码你们就都懂了,我现在想做一个分布式的操作,咱们这么去理解吧,还是刚才
的zookeeper,这边有3个节点,然后我先做有2个client端,C1变成C2,然后他们想操作同一个数据库,比如同一个数据,或者两个数据库,
都无所谓了,就是说现在有一个问题,什么叫分布式呢,你整体的环境中有好几份应用,就是咱们要开发一个WEB程序,首先
我部署到C1上了,然后我又把同样的程序部署到上了,我生产环境中前端挂一个Nginx,可能我又挂了好几层Nginx,
这个就无所谓了,总之就实现负载均衡了,一会请求他,一会请求他,这个节点其实是我自己的应用,他肯定是一个物理机啊,
自己有对应的JDK,有自己的JVM,对应的有自己的JVM,这边C2也是一样的,他自己也有对应的JVM,你要理解这个事,
然后开发的时候是同一个代码部署到C1和C2上的,那么问题就来了,如果说我当前有一个用户,去访问里面同样的代码,
我数据库好多user,我要访问user,user原先的age年龄,比如25,然后我就要实现在同一个时间点,你不能同一个时间点去减值和
加值,如果并发量非常大的话,当然我这里是分布式数据库,如果是一个数据库无所谓,如果是两个数据库有数据同步的话,
第一个节点访问自己的db,第二个节点也访问自己的DB,然后两个DB是MYSQL的主从,或者ORACLE的做双击热备,这个都是可以的,
那就相当于我这个DB里面有一张user表,userId比如1,我这个userId也是1,我这个age,你随便找一个值吧,这个age原先是25,
这个数据也是25,用户肯定是操作界面了,一堆用户,并发的去访问Nginx,相当于负载在C1机器上,也负载在C2机器上了,
并发就会导致数据不一致的问题,可能就是有一种情况,有n多个请求来对25进行加或者减操作,有可能出现一种情况,
就是重复的加,你当前一个请求过来,看到25,对他进行++了,然后他就变成26了,你进行加加的过程中,可能执行逻辑还没有
加加,最后才加加呢,由于是两个JVM,数据就变成26了,就会有这个问题,可能加重了,要不然减少了,那咱们怎么做啊,是这样的,
我利用这个节点,client端一个请求过来的时候,我先用zookeeper去创建一个临时的节点,这个临时节点可以随意,
临时节点就写一个ID,这个ID就等于1,那这样的意思就是什么啊,当C2想对我数据库user进行操作的时候,C1操作也是,
C1想对age加1,C1加1之前先不做其他事,他先去zookeeper上先去create一个node,就是一个临时的znode,它是1,这边也是
想做一个相同的操作,C2肯定也是去create一个znode,C2也想创建ID为1的,你会发现他创建不了,他就先get一遍,
首先是先get,看看zookeeper上有没有id这个节点,id等于1的这个节点,有的话我就不创建就等待,没有的话我就create一个id,
由于zookeeper支持同一个时间点只能有一个client端,并发再并发只能有一个client端去进行操作,不可能并发同时去修改
一个节点id,所以说呢,当我C1操作完了之后,自动会把znode给清空,我清空的时候别人一访问的时候,id为1的先get一下,
一看id为1的没有,怎么办,我就create出来,我create一个临时节点,就相当于一个保护的作用,像一把锁,你先去查一下zookeeper
id等于1的数据别人用不用着,用着我就在这里等着,临时节点清空了,再去创建一个,加了一把锁一样,zookeeper原生的就是这么去
实现的,我这么讲的你理不理解,能理解这个意思吗,一个会话创建一个testNode,如果没有关闭的话,别的会话就创建不了了,
这肯定是建不了的,他就是那个机制,差不多他这个分布式锁就是这么做的
比如我现在有C1,C2两个客户端,C1客户端,这里是C2客户端,他们想同时操作数据库里的一张表,两个都想操作数据库,一个IP是81,
然后C2的IP是82,都要操作同一份user数据,ID等于1,都要操作这个,81和82都要操作同样的一条数据,会有一个什么情况呢,
先去做分布式锁的时候第一步是这样的,第一步首先get,比如说我这个节点就写成这样,get /usr/1,id直接写成1,或者string类型
的字符串, get /usr/stringuuid,这个你自己随便起,比如这个人的是ID:1001,user下面去挂一个1001,get /usr/1001,
在user这个节点下,有一个1001节点,在来一个1002节点,在来一个1003节点,就表示不同的数据,1001的数据全路径就是/user/1001,
这是一个全路径,它是key,value是一个数组,你自己随便去取,首先C1想去做并发操作的时候做分布式锁的时候,他首先第一步是
get,get /user/1001, zookeeper的时候看有没有这个节点,如果没有的话,那我就进行create,create一个什么啊,create一个
Node,`Node是一个临时节点类型,我就执行我的UPDATE操作了,update我这台81机器的持久化操作,然后我持久化操作完了之后呢,
我现在肯定不是释放,肯定是81和82进行数据同步,你要保证这两台节点的数据相等,里面有一个name属性,NAME属性,或者是COUNT,
计数器,COUNT=5,我这边并发一旦大了,COUNT就改成6了,改成6了之后,现在COUNT变成6了,6一定要和82服务器同步,要不然你持久化
操作是这种机制,如果同步不成功的话,就返回持久化失败,抛出异常或者怎么怎么样了,然后就是把zk这个节点释放,你自己去想吧,
总之加入正常的情况下,C2这个节点也变成6了,假设我最后做完这个事之后,我要调zk.close(),我这个一close的话,
当前这个会话就结束了,当前这个会话一结束,临时节点就已经消失了,其他人如果想访问1001的话,就会创建一个临时节点了,
C1比如他在执行UPDATE的过程中,C2能去操作这条记录吗,你得去看一下,你得去看 get /user/1001返回的是一个路径还是一个
null值,如果他get之后,返回的是一个null空值的话,那怎么办啊,就表示是有其他的节点在操作的,那你就等着,什么时候那边完成了,
你这边才能做这个工作,大体上就是这个事,能理解我说的意思吧,这样的,你持久化节点的话,zookeeper为什么要提供temp节点,
临时的节点呢,主要目的就是这个事,持久化的话效率可能会低一些,临时节点删除和新增性能好一点,你说的是什么问题,
get不到返回空你就创建,什么是get不到,get不到我肯定就是create了,两个都get不到,同时创建就抛异常了吧,两个都get不到,
不存在同时创建的问题,你要知道zookeeper的性能是很好的,就是说你要知道,就是这个集群中,同一个时间点只能有一个人去
这里找,不可能是一个时间点两个请求,精确到毫秒级的,如果他get不到的话,那他直接就create了,create之后,
会有一个id产生,这边他去做的时候,他去创建的时候,你最后创建成功了,你要从代码的角度去讲,即使是有网络的原因,
可能情况是这样的,这边网络有点慢,去zookeeper去取的时候,没有id为1001的节点,结果创建成功了,就是创建的时候,
C2在get的时候,发现也没有这个ID,他也去创建了,他创建可能也需要很长的时间,但是你最后创建成功了,结果肯定只有一个,
创建成功了就马上同步上去了,当你C2节点又再创建的时候,我就发现已经有了,刚才我已经看到了,重复去创建的话,
就抛异常了,肯定就抛异常了,抛异常那就好办了,就证明我这个节点已经存在了,不可能存在同一个时间点存在两份的情况,
如果要是这么说的话,数据库并发量大的话,内部肯定是需要做原子消息广播加锁的,不需要考虑这些问题,不可能同一个时间节点
创建两份数据,基本上如果是这样的话,那这个zookeeper就没法用了,即使你两个client去get的时候有时间差,这个可以理解,
但是你最后创建节点的时候,一个先一个后,不可能第一个请求是0.00001秒,他先创建成功了,第二个节点的时间肯定会差一点点,
比如晚一点点,0.00002秒,他创建的时候,直接到zookeeper上去创建,之前已经有了一个创建成功了,报异常就回滚,失败,
证明有人去创建了的,能理解我说的意思吧,get的话网络的延迟,他们两个不是一个网络,第一个get的时候发现没有,
第二个get的时候也发现没有,那我就去创建节点,只能创建一个,再创建一个的时候就抛异常了,这块怎么去实现分布式锁,
zookeeper内部的原理,简单介绍一下
package com.learn.zookeeper.base;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
/**
* 找一个就是一个helloworld程式了
* 我打开看一下
* 基本上就是一个最简单的配置了
*
* @author Leon.Sun
*
*/
public class ZookeeperBase {
/** zookeeper地址 */
/**
* 集群是用逗号分隔的
* MQ是分号
* 这里是逗号分隔的
* 当然其实无所谓
* 你写一个就可以了
* 这是咱们的连接串
*/
static final String CONNECT_ADDR = "59.110.138.145:2181";
// static final String CONNECT_ADDR = "192.168.80.88:2181,192.168.80.87:2181,192.168.80.86:2181";
/** session超时时间 */
/**
* 这里有一个超时时间
* 其实你在zoo.cnf
* 配置了一堆东西
* 其实你在客户端连的时候你可以自己配
* 这个就是一个SESSION超时的时间
* client和zookeeper通信保持的时间
* 选择的是5秒就是5000毫秒
* 5秒钟如果还连不上zookeeper
* 我就认为超时了
*
*/
static final int SESSION_OUTTIME = 2000;//ms
/** 信号量,阻塞程序执行,用于等待zookeeper连接成功,发送成功信号 */
/**
* 这个还有印象吗
* CountDownLatch这个东西
* 他能干什么事呢
* 它是一个等待
* 一个等待的效果
* 比如你那里有一个线程
* 咱们看一下这个场景你就明白了
* 我现在就用到了CountDownLatch
* 总之我把这个实例化出来了
* 配置的是1
* 1个等待就足够了
*
* 为什么要在这里加一个这个东西呢
* 其实原理很简单
* 因为咱们的程序连zookeeper是一个异步的
* 你比如我这里有一个client端
* 这个client端就是咱们写的代码
* 然后咱们在这里new出一个zookeeper实例
* zk实例我new出来以后
* 我就回调watcher
* 回调watcher他的代码会一直玩下走
* 走到new Watcher()的时候
* 他去做了一个真正的connect
* 就是我代码会一直往下走的
* 我连接我zookeeper集群是异步的
* 可能我连这个花1,2秒
* 连的时候稍微有点长
* 我的代码就直接往下走了
* zookeeper实例还没有真正创建好了
* 我连接可能还没有连接好
* 这个代码已经执行其它代码了
* 比如我要create一个节点
* 创建一个节点
* 那就不行了
* 因为你这个zk实例还没有真正的连接上
* 以后肯定是通过一个zk来create一个node
* 这个zk如果没有连接上的时候
* 就会发生报空指针异常
* 本身这对象啥也没有
* 你点create肯定会报异常
* 会有这个问题
* 所以我现在就用这种方式去解决了
*
*
*/
static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
/**
* 看一下主函数main
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
/**
* 你在这里看一下zk
* org.apache.zookeeper.ZooKeeper
* 现在用的是apache的zookeeper
* 是我引入的zookeeper-3.4.5这个包
* 这个类就相当于client端实例了
* 实例化new一个就足够了
* 传递的什么东西呢
* 几个参数
* ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
* connectString连接串
* 第二个是int类型的sessionTimeout
* 就是session超时的时间
* 第三个东西叫做Watcher
* 发现这个是新接触的名词
* 他就相当于一个观察者
* 最开始介绍角色的时候
* zookeeper这个环境中一共有三种角色
* 第一个就是leader
* 然后是follower
* 这个就不用说了
* 第三个就是Observer
* Observer是一个特殊的follower
* 你也可以把Observer当做一个Watcher
* 可以理解为观察者
* 其实就是咱们的client端
* 然后他需要一个Watcher
* 我就把它实例化出来
* new一个Watcher
* 它是一个接口
* 你把它实例化
* 要重新写一个方法
* 就是process方法
* 那Watcher new出来的时候你会发现
* override一个process方法
*/
ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher(){
/**
* 里面传递一个参数
* 一个WatchedEvent
* 监控的事件
*/
@Override
public void process(WatchedEvent event) {
/**
* 下面有一个KeeperState
* 用event.getState能获得KeeperState
* 获得keeperState
* 获取事件的状态
*
* 通过event get这个状态
*
*/
KeeperState keeperState = event.getState();
/**
* event能够getState
* 也能getType
* 事件的类型和时间的状态
* 获取事件的类型
*/
EventType eventType = event.getType();
//如果是建立连接
/**
* 建立连接这个代码都是很固定的
* 事件都有什么状态啊
* KeeperState.SyncConnected就是连接成功的状态
* 看一下有什么状态
* 有这么多种状态吧
* Disconnected连接失败
* Expired过期
* NoSyncConnected没有连接上
* UnKnow也是连接失败的
* 等等一些状态吧
* 这个事件的状态
* SyncConnected就是连接成功的状态
* KeeperState.SyncConnected这个是一个值
* 这个值是一个死的
* 就是连接成功比如OK
*
* 如果这个状态和keeperState想等的话
* 那就表示连接成功了
*/
if(KeeperState.SyncConnected == keeperState){
/**
* 连接成功我再做
* 再做一次判断
* EventType事件的类型
* 如果是None是连接开始
* 刚连接成功是啥事件也没有
* 所以就是None
* 如果None就等于getType的话
* 那就表示连接成功
* 然后这里面再看一眼
* 他有好多种事件的类型
* 事件的类型和状态是两码事
* 一会详细去说
* 事件类型有好多
* NodeChildChanged我子节点发生变更
* 还有NodeCreated
* 还有NodeDataChanged数据变化
* NodeDeleted
* None什么也没做
* 他里面提供了5种状态
* 刚连接的时候是啥也没干
* None这种状态
* 那就表示连接成功了
* 连接成功了做什么事了
*
*/
if(EventType.None == eventType){
/**
* 假如你要模拟连接时间很长
*/
try {
/**
* 我等待2秒
* 然后再countDown
*/
System.out.println("开始连接.....");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
/**
* 然后咱们的zk已经建立连接了
*
* 什么时候zk真正连接成功了
* 这句话打印了
* 满足EventType.SyncConnected == eventType这个条件和
* KeeperState.SyncConnected == keeperState这个条件了
* SyncConnected和SyncConnected我就打印这句话
* 把countDown去放行
* 然后继续往下走
* 就是这个机制
* 能理解我说的意思吗
* 想一想是不是这么回事
* 差不多就是这样的
* 这是最原始最古老的写法
* 用的是原生的API
* 只有这种方式
*
* 然后他才执行
* 咱们试一下吧
*
* 现在是他先执行
* 然后这边才这么去做
*
* 这个代码是并行的
* 这个代码一放行
* 这个代码就进行往下走
*
*/
System.out.println("zk 建立连接");
//如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
/**
* 我做了一件事情
* connectedSemaphore.countDown()
* 继续往下走
*
* 我这里countDown了
*
* 你要理解countDown到底是什么含义就够了
*
*/
connectedSemaphore.countDown();
// try {
// /**
// * 我在这里sleep2秒
// * 基本上是这样的
// * 他在连的时候等3秒
// * 3秒之后咱们来看一下
// * 它会发生什么情况
// * 你想想会有什么情况
// * 在这里写sleep 2秒
// */
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
}
}
});
//进行阻塞
/**
* 当代码执行到这里的时候
* 是阻塞的
* 程序就不往下走了
* 就停在这里
*/
connectedSemaphore.await();
/**
* 结果是这个先执行
* 因为这里是主线程
* 主线程停了其他的就停了
* 我主线程要是停了的话
* 相当于JAVA代码全都停止了
* 其他线程也就不好使了
* 里面的线程都得给我停掉
* 能理解这个意思吧
* 其实什么意思
* 你会发现
*
*/
System.out.println("执行啦..");
/**
* 所以这里sleep一下
* sleep时间长就是5秒吧
*/
Thread.sleep(5000);
//创建父节点
/**
* 这里把包导进来
*
* 假如我现在执行的代码是这样的
* create(String path, byte[] data, List<ACL> acl, CreateMode createMode)
* create方法里面传了4个参数
* 第一个就是path
* path什么意思呢
* 你要创建节点节点的路径是什么
* 第二个就是一个byte类型的data
* 就对应path的value
* 就好像/testRoot这个是key
* "testRoot".getBytes()这个是value
* 然后后面是一个认证
* 一个ACL认证
* 其实这个无所谓
* 就是OPEN_ACL_UNSAFE
* 就是一个安全的东西
* 一个静态的Ids
* 咱们一般都选择这项
* 其实还有其他的
* 这个你不用担心
* 一般来说都会选择OPEN_ACL_UNSAFE
* 然后还有一个CreateMode
* 你创建节点的模式是什么
* 创建模式有很多种
* 有4种
* 首先我创建的是PERSISTENT
* 持久化的
* 有4种方式
* 持久化的普通节点
* 还有持久化的顺序节点
* PERSISTENT SEQUENTIAL
* 还有一个临时节点
* EPHEMERAL
* 还有临时的顺序节点
* 反正就是这4种方式
* 提供给创建人员
* 我现在使用的是持久化方式的
* 然后我去做这种事情
*
* 咱们看一下API行不行
* 咱们直接登录zkCli.sh就可以了
* 然后ls /
* 目前根节点下只有一个zookeeper
* 我先做是要在根节点下创建一个testRoot
* 建立完连接以后呢
* 也没有返回值
* String org.apache.zookeeper.ZooKeeper.create()
* 有一个String
* 应该是节点的信息吧
* 现在我有一个问题产生了
* 刚才其实我已经创建完节点了
* 我可以看一下
* ls /
* 下面一定是有一个testRoot这个节点的
* [zookeeper, testRoot]
* 我要是想获得就使用get /testRoot
* 我们能取得testRoot的值testRoot
* 也叫testRoot
* 就是我再去create一次
* 我现在再去创建一次
* 同样的节点/testRoot
* 值发生变化"testRoot11"
* 现在能创建还是不能创建啊
* 你想一想能不能创建
* 就是这个节点已经有了
* 我现在还想创建还能不能创建
* 你想一想猜一猜
* KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /testRoot
* NodeExists有了一个节点已经重复了
* 所以有节点就不能创建了
* 这样的话有节点就不能创建了
* 咱们就删掉
* delete /testRoot
* 咱们再次去创建一下
* 这种PERSISTENT是持久化的方式
* 返回的内容你也看到了
* 就是/testRoot
* 这就是创建节点
* 然后这个内存就变成了testRoot11
* 也可以看看
* 咱们get /testRoot
* 变成testRoot11这个值了
* 当前testRoot已经有这个节点了
*
*/
// String ret = zk.create("/testRoot", "testRoot11".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// System.out.println(ret);
//创建子节点
/**
* 然后我想创建一个子节点该怎么去写啊
* 你必须要这么去写
* /testRoot/children
* 你必须写testRoot下面的children
* 就是这个children
* 然后还有一个问题就是说
* 咱们先看这个创建子节点吧
* 就是在/testRoot下面再加一个
* 写法就是这样的
* 然后整体的key的value值就是"children data".getBytes()这个
* 然后还是使用OPEN_ACL_UNSAFE这种方式
* 然后EPHEMERAL这块注意
* 这是一个临时节点
* 临时节点我记得是有一个时间的
* 他就会被清空
* 但是我忘了临时节点是多长时间了
* 你可以百度一下查一下
* 应该默认是一分钟还是多久我忘了
* ls /testRoot
* []
* String org.apache.zookeeper.ZooKeeper.create
* 返回一个String
* 这个是一个临时的创建节点
* 在testRoot下创建一个临时节点
*
*/
// zk.create("/testRoot/children", "children data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
String ret = zk.create("/testRoot/children", "children data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
/**
* /testRoot/children
* 其实创建成功了
* 因为你是临时节点
* 直接就清空了
* 时间也太短了
* 直接过长我就忘记了
* 反正刚才咱们已经看到了,他既然是有返回值,既然run as 有返回值
* 这个节点马上又被删除掉了
* 他就是创建完了就直接停了
* 我想起来了
* 是这样的
* 本次会话有效
* 下次会话就没效了
*
*/
System.out.println(ret);
/**
* 咱们在这里sleep10秒
* 就是暂停10秒钟
* 这样去看就能看到效果了
* 这个children就一直还在
* 然后你等我这个程序停了以后
* 我这个child就没了
* 也就是说这个临时节点啊
* 保持这个zk本次会话有效
* 临时会话本次有效
* 他可以去做一个什么事啊
* 好多人去那他做一个分布式锁啊
* zookeeper去做一个分布式锁是为一个临时节点去创建的
* 但是这个就有点底层了
* 简单给大家就解释一下吧
* 它是怎么去做的呢
* 是这样的
*
*
*/
Thread.sleep(10000);
/**
* 比如我现在有C1,C2两个客户端
* C1客户端
*/
/**
* 这里是C2客户端
* 他们想同时操作数据库里的一张表
*/
//获取节点洗信息
// byte[] data = zk.getData("/testRoot", false, null);
// System.out.println(new String(data));
// System.out.println(zk.getChildren("/testRoot", false));
//修改节点的值
// zk.setData("/testRoot", "modify data root".getBytes(), -1);
// byte[] data = zk.getData("/testRoot", false, null);
// System.out.println(new String(data));
//判断节点是否存在
// System.out.println(zk.exists("/testRoot/children", false));
//删除节点
// zk.delete("/testRoot/children", -1);
// System.out.println(zk.exists("/testRoot/children", false));
/**
* 做完了之后我就会去做zk的close
* 你建立完连接之后呢要关闭
*/
zk.close();
}
}