Zookeeper 实现分布式节点下的配置文件统一管理和分布式锁

    xiaoxiao2025-03-30  20

    原文地址:https://www.jianshu.com/p/f37b14cf89f0

    https://archive.apache.org/dist/     //jars包集合

    一、ZooKeeper 简介

    ZooKeeper 是一个集中式服务,用于维护配置信息,命名,提供分布式同步和提供组服务。 ZooKeeper 的主要应用:1、节点选举;2、配置文件统一管理;3、分布式锁;4、发布与订阅(Dubbo);5、集群管理,集群中保证数据的强一致性,下面我们主要讲配置文件统一管理和分布式锁。

    Zookeeper文件系统

    Zookeeper的每个子目录项如 NameService 都被称作为 znode,和文件系统一样,我们能够自由的增加、删除 znode,在一个 znode 下增加、删除子 znode,唯一的不同在于znode是可以存储数据的。

     

    有四种类型的 znode:

    1、PERSISTENT-持久化目录节点 客户端与 zookeeper 断开连接后,该节点依旧存在。2、PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点 客户端与 zookeeper 断开连接后,该节点依旧存在,只是 zookeeper 给该节点名称进行顺序编号。3、EPHEMERAL-临时目录节点 客户端与 zookeeper 断开连接后,该节点被删除。4、EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点 客户端与 zookeeper 断开连接后,该节点被删除,只是 zookeeper 给该节点名称进行顺序编号。

    二、配置文件统一管理

    1、实现思路

    假如我们需要修改三(或者更多)台服务器上 redis.conf 的配置信息,如果一台一台的去修改,则会加大出错概率,而且也不实际。这时候,我们需要引入Zookeeper(下面简称zk),我们需要知道,zk 中有个 watcher 事件,包括 : EventType:NodeCreated //节点创建 EventType:NodeDataChanged //节点的数据变更 EventType:NodeChildrentChanged //子节点下的数据变更 EventType:NodeDeleted // 节点删除 当我们监听了上面的事件时,事件触发就会被告知。以统一更新 redis.conf 配置文件为例,我们可以实现监听某一个节点的数据更新事件,当DBA更改了该节点的值(一般为 json 串,方便程序解析,例:{"type":"update","url":"ftp:192.168.2.10/config/redis.xml"}),此时我们可以根据 type 的值“update”可知,是需要更新 redis.conf 配置文件,然后根据 url 的值,获取最新的 redis.conf 文件所在的服务器地址。此时,我们可以下载最新配置文件,然后删除原来的 redis.conf 配置文件,最后将最新的配置文件添加到项目中,从而通过重启程序就可以读取到最新的配置了。

    2、代码实现

    这里我们模拟了三个客户端 Client1、Client2、Client3,代码都是一样的。当 zk 节点数据发送变化,就会触发数据更新的事件,从而告知其客户端(必须监听了该事件)。

    package com.imooc.curator.operator; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.retry.RetryNTimes; import org.springframework.util.StringUtils; import java.util.concurrent.CountDownLatch; /** * 使用 zk 的 watch 事件,实现配置文件的统一配置 * @author K. L. Mao * @create 2018/9/8 */ public class Client1 { public CuratorFramework client; public static final String zkServerPath = "192.168.174.10:2181,192.168.174.11:2181,192.168.174.12:2181"; public static final String CONFIG_NODE_PATH = "/super/imooc"; public static final String SUB_PATH = "/redis-config"; public static CountDownLatch countDownLatch = new CountDownLatch(1); /** * 实例化 zk 客户端 */ public Client1(){ /** * curator 连接 zk 的策略:RetryNTimes * n:重试次数 * sleepMsBetweenRetries:每次重试间隔的时间 */ RetryPolicy retryPolicy = new RetryNTimes(3, 5000); client = CuratorFrameworkFactory.builder() .connectString(zkServerPath) .sessionTimeoutMs(10000).retryPolicy(retryPolicy) .namespace("workspace").build(); // 启动客户端连接 client.start(); } /** * 关闭 zk 客户端 */ public void closeZKClient() { if (client != null){ client.close(); } } public static void main(String[] args) throws Exception { // 实例化 Client1 operator = new Client1(); System.out.println("client1 启动成功"); // 创建节点 // byte[] data = "super".getBytes(); // operator.client.create().creatingParentsIfNeeded() // .withMode(CreateMode.PERSISTENT) // 持久化节点 // .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) // 全权限ACL // .forPath(nodePath, data); // watcher 事件 当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁 // operator.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath); // NodeCache:监听 CONFIG_NODE_PATH 下的子包数据节点的变更,会触发事件 final PathChildrenCache nodeCache = new PathChildrenCache(operator.client, CONFIG_NODE_PATH, true); // 初始化的时候获取 node 的值并且缓存 nodeCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); // 新增监听器 nodeCache.getListenable().addListener((client, event) -> { // 监听节点更新 if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ String configNodePath = event.getData().getPath(); if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)){ System.out.println("监听到配置发生变化,节点路径为:" + configNodePath); // 读取节点数据 String data = new String(event.getData().getData(), "UTF-8"); System.out.println("节点" + CONFIG_NODE_PATH + "的数据为" + data); if (!StringUtils.isEmpty(data)){ JSONObject jsonObject = JSON.parseObject(data); String type = jsonObject.getString("type"); String url = jsonObject.getString("url"); if ("add".equals(type)){ System.out.println("监听到新增的配置,准备下载..."); // ... 连接ftp服务器,根据url找到对应的配置 Thread.sleep(500); System.out.println("开始下载新的配置文件,下载路径为<" + url + ">"); // ... 下载配置到你指定的目录 Thread.sleep(1000); System.out.println("下载成功,已经添加到项目中"); }else if ("update".equals(type)){ System.out.println("监听到更新的配置,准备下载..."); // ... 连接ftp服务器,根据url找到对应的配置 Thread.sleep(500); System.out.println("开始下载新的配置文件,下载路径为<" + url + ">"); // ... 下载配置到你指定的目录 Thread.sleep(1000); System.out.println("下载成功"); System.out.println("删除项目中原配置文件..."); Thread.sleep(100); // ... 删除原文件 System.out.println("拷贝配置文件到项目目录..."); // ... 拷贝文件到项目 }else if ("delete".equals(type)){ System.out.println("监听到删除的配置"); System.out.println("删除项目中原配置文件..."); } } } } }); countDownLatch.await(); operator.closeZKClient(); } }

    三、分布式锁

    1、场景

    高并场景下,对共享资源的访问,都需要加锁,分布式的环境下,就需要加分布式锁。如下代码,模拟一个分布式的并发操作。

    Service package com.imooc.curator.service; import org.springframework.stereotype.Service; /** * @author K. L. Mao * @create 2018/9/9 */ @Service public class PayService { private static int COUNT = 100; /** * 高并发下的 count-1 * @return */ public int countLock(){ if (COUNT <= 99){ return -1; } try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } COUNT = COUNT - 1; return COUNT; } } Controller package com.imooc.curator.web; import com.imooc.curator.service.PayService; import com.imooc.curator.utils.ZKCurator; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; /** * @author K. L. Mao * @create 2018/9/9 */ @RestController public class PayController { @Autowired private PayService payService; /** * 模拟客户端1 * @return */ @GetMapping("/lock") public int lock(){ return payService.countLock(); } /** * 模拟客户端2 * @return */ @GetMapping("/lock2") public int lock2(){ return payService.countLock(); } }

    当我们在三秒内分别访问 localhost:8080/lock 和 localhost:8080/lock2 时,就会产生2个线程对 COUNT 进行了操作,使其 变为98了。这样是不允许的,那么如何保证这两个线程是依次执行的呢?这时候,我们需要 zk 来实现分布式锁了。

    2、实现思路

    zk 分布式锁

     

    当一个线程尝试访问资源时,我们需要它去获得锁,如果该锁被占用,则等待锁的释放(CountDownLatch 实现);如果该锁未被占用,则创建 zk 节点,从而拥有了该锁,然后执行业务逻辑,最后释放锁(删除节点,触发节点删除事件,执行 CountDownLatch.countDown(),让等待的线程重新去尝试获取锁)。

    3、代码实现

    ZkCuratorConfig :配置 CuratorFramework package com.imooc.curator.config; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author K. L. Mao * @create 2018/9/9 */ @Configuration public class ZkCuratorConfig { @Bean public CuratorFramework curatorFramework(){ // 连接失败会尝试5次连接 每次连接间隔5秒 RetryPolicy retryPolicy = new RetryNTimes(5, 5000); String zkServer = "192.168.174.10:2181"; // sessionTimeoutMs 会话超时时间 CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(zkServer, 10000, 5000, retryPolicy); curatorFramework.start(); return curatorFramework; } } DistributedLock :根据 CuratorFramework 配置分布式锁工具类 package com.imooc.curator.utils; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.concurrent.CountDownLatch; /** * 分布式锁的实现工具类 * @author K. L. Mao * @create 2018/9/9 */ @Component @Slf4j public class DistributedLock { @Autowired private CuratorFramework client; // 用于挂起当前请求,并且等待上一个分布式锁释放 private static CountDownLatch zkLockLatch = new CountDownLatch(1); // 分布式锁的总节点名 private static final String ZK_LOCK_PROJECT = "/imooc-locks"; // 分布式锁节点 private static final String DISTRIBUTED_LOCK = "/distributed_lock"; @PostConstruct public void init(){ // 使用命名空间 client = client.usingNamespace("ZKLocks-Namespace"); /** * 创建zk锁的总节点 * ZKLocks-Namespace * | * — imooc-locks * | * — distributed_lock */ try { // 如果 ZK_LOCK_PROJECT 目录不存在,需要创建 if (client.checkExists().forPath(ZK_LOCK_PROJECT) == null){ client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) // 存放锁的总节点,所以需要永久性的 .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(ZK_LOCK_PROJECT); } // 针对 zk 的分布式锁节点,创建相应的watcher事件监听,来监听其子节点(distributed_lock)的删除事件 addWatcherToLock(ZK_LOCK_PROJECT); }catch (Exception e){ log.error("客户度连接zookeeper服务器错误..."); } } /** * 获得分布式锁 */ public void getLock(){ // 使用死循环,当且仅当上一个锁释放并且当前请求获得锁后才会跳出 while (true){ try { client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) // 存放分布式锁,所以需要临时的 .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath(ZK_LOCK_PROJECT + DISTRIBUTED_LOCK); log.info("获得分布式锁"); // 如果锁的节点能够被创建成功,即不存在,则锁没有被占用 return; }catch (Exception e){ log.error("获得分布式锁失败..."); try { // 如果没有获得锁,需要重新设置同步资源值 if (zkLockLatch.getCount() <= 0){ zkLockLatch = new CountDownLatch(1); } // 阻塞进程 zkLockLatch.await(); }catch (InterruptedException e1) { e1.printStackTrace(); } } } } /** * 释放分布式锁 * @return */ public boolean releaseLock(){ try { // 删除分布式锁节点 if (client.checkExists().forPath(ZK_LOCK_PROJECT + DISTRIBUTED_LOCK) != null){ client.delete().forPath(ZK_LOCK_PROJECT + DISTRIBUTED_LOCK); } }catch (Exception e){ e.printStackTrace(); return false; } log.info("分布式锁释放完毕"); return true; } /** * 创建 watcher 监听 * @param path * @throws Exception */ public void addWatcherToLock(String path) throws Exception { PathChildrenCache cache = new PathChildrenCache(client, path, true); cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener((client, event) -> { // 只监听子节点的移除事件 if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){ String nodePath = event.getData().getPath(); log.info("上一个会话已经释放锁或者该会话已经断开,节点路径为:" + nodePath); if (nodePath.contains(DISTRIBUTED_LOCK)){ log.info("释放计数器,让当前请求来获得分布式锁..."); zkLockLatch.countDown(); } } }); } }

    init():初始化 DistributedLock,主要是声明命名空间,以及创建存放锁的总节点(永久模型),并且创建 Watcher 监听事件,来监听总节点下的子节点的删除事件。

    addWatcherToLock(String path):给指定路径创建 watcher 监听。如果监听到节点删除事件,则表示当前线程释放了锁,执行CountDownLatch.countDown()操作。

    getLock():死循环的去获取分布式锁,即创建一个临时的节点,表示分布式锁。如果节点已经存在,则会报错,说明获得分布式锁失败,因为有别的线程已经占用了,于是挂起当前线程(CountDownLatch.await())。反之,如果创建成功,则表示获得了分布式锁,跳出死循环。

    releaseLock():释放分布式锁,即删除表示分布式锁的节点。

    Service 添加分布式锁 package com.imooc.curator.service; import com.imooc.curator.utils.DistributedLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author K. L. Mao * @create 2018/9/9 */ @Service public class PayService { @Autowired private DistributedLock distributedLock; private static int COUNT = 100; /** * 分布式锁实现 count-1 * @return */ public int countLock(){ // 获取锁 distributedLock.getLock(); if (COUNT <= 99){ distributedLock.releaseLock(); return -1; } try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } COUNT = COUNT - 1; distributedLock.releaseLock(); return COUNT; } }

    此时,保证了只有一个线程 COUNT - 1 的操作,另一个线程获得锁的时候,上一个线程已经执行完了,即 COUNT = 99,从而直接返回了 -1(模拟直接返回前端,直接跳过业务逻辑)。

     

    最新回复(0)