分布式锁-zookeeper-lock

    xiaoxiao2025-12-25  10

    第一次写博客,写的不对的或者不明了的请大家多多指正

    ===object pool 介绍====

    主要类、接口介绍:

    接口 PooledObjectFactory 池对象工厂

    用来创建池对象, 

    将不用的池对象进行钝化(passivateObject), 

    对要使用的池对象进行激活(activeObject), 

    对池对象进行验证(validateObject), 

    对有问题的池对象进行销毁(destroyObject)等工作

    PooledObject<T> makeObject(); 生成对象

    void activateObject(PooledObject<T> p); 激活对象

    void destroyObject(PooledObject<T> p); 销毁对象

    ......

    抽象类 BasePooledObjectFactory<T> implements PooledObjectFactory<T>

    接口 ObjectPool : 对象池

    T borrowObject(); 从池中借出一个对象

    void returnObject(T obj); 归还对象

    ......

    GenericObjectPool 创建对象池

    实现的三个接口: ObjectPool 、 GenericObjectPoolMXBean 、 UsageTracking

    主要问题解析:

    如何实现公平锁:

    pool底层维护了一个双端队列LinkedBlockingDeque

    该双端队列使用ReentrantLock实现了可重入锁

    因此底层可以修改ReentrantLock的公平和非公平机制实现pool的公平锁

    初始化双端队列时可以设置GenericObjectPoolConfig.fairness为true来实现公平锁,默认为false(非公平锁)

    双端队列LinkedBlockingDeque的长度默认为Integer.MAX_VALUE

    borrowObject()怎么实现的:

    从LinkedBlockingDeque双端队列中取第一个对象LinkedBlockingDeque.pollFirst(),

    如果没有取到-->调用指定的factory.makeObject()-->factory.activateObject(p)-->return 

    如果取到了-->factory.activateObject(p)-->return

    returnObject(T obj)怎么实现的:

    1:锁定对象,验证对象的状态

    2:factory.passivateObject(p) 钝化对象(空实现)

    3:p.deallocate() 释放对象

    4:如果对象池closed-->调用指定的factory.destroyObject(p);

      如果对象池没有closed-->LinkedBlockingDeque.addFirst(p)

    ====org.apache.curator.framework包简单介绍====

    主要类、接口介绍:

    接口 CuratorFramework ZooKeeper客户端开源工具,主要提供了对客户端到服务的连接管理和连接重试机制,以及一些扩展功能

    void start();

    void close();

    CuratorFrameworkState getState();

    boolean isStarted();

    CreateBuilder create();

    ......

    类 ExponentialBackoffRetry 创建目录或者删除目录的重试策略

    类 LockInternals 是所有申请锁与释放锁的核心实现

    String attemptLock(time, unit, getLockNodeBytes()) 尝试获得锁 返回锁定的路径

    boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) 循环等待尝试加锁

    void releaseLock(String lockPath)

    void deleteOurPath(String ourPath)

    类 StandardLockInternalsDriver driver

    String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) 创建节点

    ourPath = client.create().creatingParentContainersIfNeeded() 

    .withProtection()

    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)

    .forPath(path, lockNodeBytes) 创建临时顺序节点路径

    类 CreateBuilderImpl client.create()方法返回的客户端实现类实例

    String forPath(final String givenPath, byte[] data)

    String pathInForeground(final String path, final byte[] data)

    (生成目录!!)createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode)

    类 InterProcessMutex 互斥锁 一个可重入锁,提供分布式锁的入口服务

    锁的定义:ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap(); 将线程对象和锁对象(线程、路径、锁的数量)关联

    void acquire() 无限等待,直到获取到锁(默认time=-1,unit=null)

    boolean acquire(long time, TimeUnit unit) 有限等待,在规定的时间内获取锁

    1:boolean internalLock(long time, TimeUnit unit)

    当前线程在缓存中是否已经存在一把锁,如果存在,将锁的次数加1直接返回

    如果不存在,执行第2步

    2:(主要方法!!)String LockInternals.attemptLock(time, unit, getLockNodeBytes()) 尝试获得锁 返回锁定的路径

    StandardLockInternalsDriver.createsTheLock(client, path, localLockNodeBytes) 创建节点

    client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);

    在指定的节点路径下,顺序创建子节点(按照请求顺序)

    注意:对每个请求都会创建一个节点

    LockInternals.internalLockLoop(startMillis, millisToWait, ourPath) 循环等待尝试加锁

    a、根据basePath获取子节点,按照创建时间升序排列(最先申请的在最前)

    b、查看子节点中是否包含传入的节点,并且下标是否为0(acquire区分maxLeases)

    如果下标为0,直接返回锁定成功,否则进行c步骤

    c、启动一个自定义watcher后台监听传入的节点的状态,一旦监听到节点数据变更或删除,则就直接 notifyFromWatcher();

    同时根据millisToWait(acquire方法的不同参数区分)判断是否wait

    d、如果出现异常,删除节点路径

    void release() 判断是否是当前线程,或者非当前线程。最终会根据线程号找到对应的path路径,然后直接删除该临时节点

    LockInternals.releaseLock(String lockPath)

    LockInternals.deleteOurPath(String ourPath)

    client.delete().guaranteed().forPath(ourPath);

    扩展:

    方法 Thread.currentThread()

    方法 AtomicInteger lockCount.incrementAndGet();

    类 org.apache.curator.RetryLoop 

    =====具体实现代码(伪代码)======

    =====1、初始化对象池==========

    @Component("curatorPoolFactory") public class CuratorPoolFactory extends BasePooledObjectFactory<CuratorFramework> { @Value("#{vipProperties['zk.address']}") private String zkHost; @Value("#{vipProperties['zk.namespace']}") private String zkNamespace; @Override public CuratorFramework create() throws Exception { CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() .namespace(zkNamespace) .connectString(zkHost) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); curatorFramework.start(); return curatorFramework; } @Override public PooledObject<CuratorFramework> wrap(CuratorFramework obj) { return new DefaultPooledObject<CuratorFramework>(obj); } } <!--zk client pool --> <bean id="curatorPool" class="org.apache.commons.pool2.impl.GenericObjectPool" destroy-method="close"> <constructor-arg ref="curatorPoolFactory" /> <property name="maxTotal" value="5" /> <property name="maxIdle" value="2" /> </bean> =====2、判断是否可以获得锁======

    org.apache.commons.pool2.impl.GenericObjectPool curatorPool = (GenericObjectPool<CuratorFramework>) AppContext.getApplicationContext().getBean("curatorPool"); org.apache.curator.framework.CuratorFramework curatorFramework = curatorPool.borrowObject(); org.apache.curator.CuratorZookeeperClient zookeeperClient = curatorFramework.getZookeeperClient(); org.apache.zookeeper.ZooKeeper zooKeeper = zookeeperClient.getZooKeeper(); org.apache.zookeeper.data.Stat stat = zooKeeper.(final String zkPath, Watcher watcher); stat != null && stat.getNumChildren() > 0 ==> 已锁 直接返回 ====3、可以得到锁====

    org.apache.commons.pool2.impl.GenericObjectPool curatorPool = (GenericObjectPool<CuratorFramework>) AppContext.getApplicationContext().getBean("curatorPool"); try: org.apache.curator.framework.CuratorFramework curatorFramework = curatorPool.borrowObject(); org.apache.curator.framework.recipes.locks.InterProcessMutex lock = new InterProcessMutex(curatorFramework, zkPath) lock.acquire(); #DO SOMETHING# finally: lock.release(); curatorPool.returnObject(curatorFramework);

    总结:

    实现分布式的重点是Stat类

    实现锁的中点是InterProcessMutex类

    最新回复(0)