Redis学习(三):redis集群之cluster模式下的跨节点的批量操作 II

    xiaoxiao2023-11-18  138

    说明

    在上篇博文《Redis学习(二):redis集群之cluster模式下的跨节点的批量操作 I》中,对Jedis客户端的JedisCluster类源码进行了学习,了解了Jedis对cluster操作原理,其中有两个关键类JedisClusterConnectionHandler和JedisClusterInfoCache,在JedisClusterInfoCache类中有两个关键变量Map<String,JedisPool> nodes 和 Map<Integer, JedisPool> slots,这两个变量主要存储了cluster集群的主节点信息和其负责的槽点信息。在网上的资料查询资料后发现,对redis cluster进行批量操作主要以pipeline的方式实现。

    本篇博文将结合之前学习源码获取的知识和pipeline的方式,实现cluster模式下的跨节点批量操作。这里主要是通过对key进行区分,以通过CRC16算法获取的slot所在同一个节点的key进行区分,然后分别通过pipeline方式获取后,再进行聚合返回。所以代码中如何获取slot所对应的Jedis则成为关键步骤。

    这里总结两种方式,一种是通过反射从JedisCluster类中获取connectionHandler对象属性,再通过此对象以反射方式获取cache对象属性,通过cache的slots变量获取对应的JedisPool进行分类。另外一种方式则是通过JedisCluster对象的getClusterNodes()获取集群节点的信息,然后通过TreeMap<Long, String> map将主节点负责的槽点位置进行存储,通过slot获取节点信息来获取JedisPool。

    以上两种方式本质上没有区别,都是以key的slot值得到对应的节点的JedisPool方式将key进行分类,主要是在获取JedisPool的方式上有所差别。

    正文

    反射

    创建RedisClusterManager类,该类有以下变量:

    public static final String NODES_ADDR = ""; private static final JedisCluster cluster; private static final Field CONNECTION_HANDLER; private static final Field CACHE_INFO; private static final JedisSlotBasedConnectionHandler connectionHandler; private static final JedisClusterInfoCache clusterInfoCache;

    通过类对象和变量名称获取类的变量对象Field,再通过反射获取其值。关键代码为:

    CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler"); CACHE_INFO = getField(JedisClusterConnectionHandler.class, "cache"); connectionHandler = getValue(cluster, CONNECTION_HANDLER); clusterInfoCache = getValue(connectionHandler, CACHE_INFO); private static Field getField(Class<?> clazz, String fieldName) { try { Field field = clazz.getDeclaredField(fieldName); field.setAccessible(true); return field; } catch (NoSuchFieldException e) { throw new RuntimeException("cannot find or access field '" + fieldName + "' from " + clazz.getName()); } } private static <T> T getValue(Object object, Field field) { try { return (T) field.get(object); } catch (IllegalAccessException e) { throw new RuntimeException(); } }

    获取到JedisClusterInfoCache 对象后,在进行批量操作时,就可以根据key计算其slot值,得到对应的JedisPool,对key进行分类,然后以pipeline的方式获取值。

    Map<JedisPool, List<String>> keysToPoolMap = new HashMap<>(); for (String key : keys) { int slot = JedisClusterCRC16.getSlot(key); JedisPool jedisPool = clusterInfoCache.getSlotPool(slot); if (keysToPoolMap.containsKey(jedisPool)) { keysToPoolMap.get(jedisPool).add(key); } else { List<String> keyList = new ArrayList<>(); keysToPoolMap.put(jedisPool, keyList); keyList.add(key); } }

    TreeMap

    该方式类似于JedisClusterInfoCache 的nodes和slots,slots是将每个slot值所对应的JedisPool存储起来,这里使用TreeMap<Long, String>方式,将每个主节点负责槽点的起始值保存起来,通过key的slot值找到对应的节点信息host,再通过host得到对应的JedisPool。

    创建RedisClusterManager2类,该类有以下变量:

    private final JedisCluster cluster; private final TreeMap<Long, String> slotToHost; private final ReentrantReadWriteLock rwl; private final Lock r; private final Lock w; private volatile boolean renewSlotToHost;

    初始化方法init()中,通过cluster的getClusterNodes()获取集群节点信息,然后设置map的值。

    private void init() { this.w.lock(); Map<String, JedisPool> map = cluster.getClusterNodes(); Iterator iterator = map.values().iterator(); Jedis jedis = null; try { this.reset(); while (true) { if (!iterator.hasNext()) { return; } JedisPool jedisPool = (JedisPool) iterator.next(); jedis = jedisPool.getResource(); initMap(jedis); break; } } catch (Exception e) { throw new RuntimeException(e); } finally { if (jedis != null) { jedis.close(); } this.w.unlock(); } }

    在以上代码中,通过得到的集群节点对应的JedisPool信息,获取Jedis对象,初始化map。

    private void initMap(Jedis jedis) { List<Object> slots = jedis.clusterSlots(); try { for (Object slotInfo : slots) { List slotInfoList = (List) slotInfo; if (slotInfoList.size() > 2) { Long start = (Long) slotInfoList.get(0); Long end = (Long) slotInfoList.get(1); List<Object> hostInfos = (List<Object>) slotInfoList.get(2); String host = SafeEncoder.encode((byte[])((byte[])hostInfos.get(0))) + ":" + ((Long)hostInfos.get(1)).intValue(); slotToHost.putIfAbsent(start, host); slotToHost.putIfAbsent(end, host); } } } catch (Exception e) { throw new RuntimeException(e); }

    在以上代码中,通过Jedis的clusterSlots()方法获取集群的槽点信息,在上篇博文中,我对该方法的返回值进行了简单介绍,在此不再赘述。通过槽点信息获取每个节点负责的槽点值及其地址信息。

    在进行批量操作时,对key进行分类后,以pipeline的方式获取值。区分key的关键代码:

    public Map<Jedis, List<String>> getJedisToKey(List<String> keys) { Map<Jedis, List<String>> res = new HashMap<>(); try { Map<String, Jedis> hostToJedis = new HashMap<>(); Map<String, JedisPool> hostToPool = cluster.getClusterNodes(); int retry = 0; for (String key : keys) { int slot = JedisClusterCRC16.getSlot(key); Map.Entry<Long, String> hostMap = getSlotHost(Long.valueOf(slot)); if (hostMap != null) { String host = hostMap.getValue(); Jedis jedis = hostToJedis.get(host); if (jedis == null) { JedisPool jedisPool = hostToPool.get(host); // 当集群节点发生变动时,可以进行重试获取,这里允许重试两次 if (jedisPool == null) { if (retry > 1) { break; } this.rediscovery(); retry++; continue; } jedis = jedisPool.getResource(); hostToJedis.put(host, jedis); } List<String> keysOfJedis = res.get(jedis); if (keysOfJedis == null) { keysOfJedis = new ArrayList<>(); res.put(jedis, keysOfJedis); } keysOfJedis.add(key); } } } catch (Exception e) { logger.error("assign key to redis failed", e); } return res; }

    至此,该种方式的核心代码结束。

    这两种方式的主要区别是反射依托了JedisClusterInfoCache 类实现,而通过TreeMap存储方式更像JedisClusterInfoCache 类一样,可以构造一个专门的类来实现。

    这里只是简单地实现了跨节点的批量操作,在实现上可能存在缺陷,并且对两种方式的执行效率没有进行比较,无法判定哪种方式更好。

    对redis的操作,一般都是以Jedis作为客户端,但在集群中,该客户端会有些不足。在Spring5中,spring官方用Lettuce客户端替换了Jedis,Lettuce客户端底层基于Netty实现,支持同步,异步操作,并且连接是线程安全的,可以在多线程中共享。但是相比之下,Lettuce比Jedis更“重量级”。

    有关Lettuce更多相关内容详见: https://lettuce.io/core/release/reference/index.html#overview.requirements https://github.com/lettuce-io/lettuce-core

    参考资料: https://blog.csdn.net/Vi_NSN/article/details/78704587 https://blog.csdn.net/Vi_NSN/article/details/78704587

    项目源码:https://github.com/Edenwds/redis_study/tree/master/rediscluster

    最新回复(0)