spring结合hbase

    xiaoxiao2022-07-13  155

    最近重构了一波代码,使用spring管理hbase实例。

    依赖

    <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop</artifactId> <version>2.5.0.RELEASE</version> </dependency>

    spring-hbase.xml配置

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop-2.3.xsd"> <!-- HDFS配置 --> <hdp:configuration id="hadoopConfiguration"> fs.defaultFS="hdfs://localhost:9000" </hdp:configuration> <!-- HBase连接配置 --> <hdp:hbase-configuration id="hbaseConfiguration" delete-connection="false" zk-quorum="hdp3,hdp4,hdp5" zk-port="2181"/> <bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate为依赖带的类," p:configuration-ref="hbaseConfiguration" /> <bean id="leafhbaseTemplate" class="com.hbase.LeafHbaseTemplate" p:configuration-ref="hbaseConfiguration" p:zkNode="/hbase-unsecure" /> </beans>
    配置说明
    zk-quorum:zookeeper的IP地址zk-port:zookeeper的端口class:配置具体的类,org.springframework.data.hadoop.hbase.HbaseTemplate为依赖带的类,com.hbase.LeafHbaseTemplate为我自己定义的类 package com.hbase; import com.google.gson.Gson; import com.hbase.annotation.FamilyField; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.springframework.data.hadoop.hbase.HbaseTemplate; import org.springframework.data.hadoop.hbase.RowMapper; import org.springframework.data.hadoop.hbase.TableCallback; import org.springframework.util.Assert; import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; /** * 重写baseTemplate类 * 通过id查 * 支持批量查 * 通过id写 * <p> * 不再是通过rowkey * @created by ycc * @since 2019-04-22 */ @Slf4j public class LeafHbaseTemplate<T> extends HbaseTemplate { protected String tableName; protected String zkNode; protected int[] retryInterval = new int[]{1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192}; /** * 重写get方法,可以传入id,自动通过id生成rowkey, * 传入的rowName为id * * @param tableName * @param rowName * @param familyName * @param qualifier * @param mapper * @param <T> * @return */ @Override public <T> T get(String tableName, final String rowName, final String familyName, final String qualifier, final RowMapper<T> mapper) { return execute(tableName, new TableCallback<T>() { @Override public T doInTable(HTableInterface htable) throws Throwable { Get get = new Get(rowName.getBytes(getCharset())); if (familyName != null) { byte[] family = familyName.getBytes(getCharset()); if (qualifier != null) { get.addColumn(family, qualifier.getBytes(getCharset())); } else { get.addFamily(family); } } Result result = htable.get(get); return mapper.mapRow(result, 0); } }); } /** * hbase通过批量的rowkey获取批量的结果 * * @param tableName * @param ids * @param mapper * @return */ public List<T> gets(String tableName, Collection<String> ids, final RowMapper<T> mapper) { return gets(tableName, ids, null, null, mapper); } public <T> List<T> gets(String tableName, final Collection<String> ids, final String familyName, final String qualifier, final RowMapper<T> mapper) { return execute(tableName, new TableCallback<List<T>>() { @Override public List<T> doInTable(HTableInterface htable) throws Throwable { List<Get> gets = new LinkedList<>(); List<T> tList = new LinkedList<>(); for (String id : ids) { String rowName = genRowKey(id); Get get = new Get(rowName.getBytes(getCharset())); if (familyName != null) { byte[] family = familyName.getBytes(getCharset()); if (qualifier != null) { get.addColumn(family, qualifier.getBytes(getCharset())); } else { get.addFamily(family); } } gets.add(get); } Result[] results = htable.get(gets); for (Result result : results) { if (result.listCells() != null && result.listCells().size() > 1) { tList.add(mapper.mapRow(result, 0)); } } return tList; } }); } /** * 重写put方法,传入id * * @param tableName * @param rowName * @param familyName * @param qualifier * @param value */ @Override public void put(String tableName, final String rowName, final String familyName, final String qualifier, final byte[] value) { Assert.hasLength(rowName); Assert.hasLength(familyName); Assert.hasLength(qualifier); Assert.notNull(value); execute(tableName, new TableCallback<Object>() { @Override public Object doInTable(HTableInterface htable) throws Throwable { String rowkey = genRowKey(rowName); Put put = new Put(rowkey.getBytes(getCharset())).add(familyName.getBytes(getCharset()), qualifier.getBytes(getCharset()), value); htable.put(put); return null; } }); } public void puts(List<T> tList, int retryIntervalUnit, String tableName, String rowkey) { List<Put> puts = new ArrayList<>(tList.size()); for (T t : tList) { try { puts.add(getPut(rowkey, t)); } catch (IllegalAccessException e) { e.printStackTrace(); } } puts(puts, retryIntervalUnit, tableName); } private Put getPut(String rowkey, T t) throws IllegalAccessException { Field[] fields = t.getClass().getDeclaredFields(); Put put = new Put(Bytes.toBytes(rowkey)); for (Field field : fields) { field.setAccessible(true); String family = "p"; Object o = field.get(t); if (field.getAnnotation(FamilyField.class) != null) { family = field.getAnnotation(FamilyField.class).value(); } if (o != null) { put.addColumn(Bytes.toBytes(family), Bytes.toBytes(field.getName()), Bytes.toBytes(new Gson().toJson(o))); } } return put; } /** * 批量写入 * * @param puts * @param retryIntervalUnit * @param tableName */ public void puts(final List<Put> puts, final int retryIntervalUnit, String tableName) { execute(tableName, new TableCallback<Object>() { @Override public Object doInTable(HTableInterface htable) { awaitFlushTerminate(puts, retryIntervalUnit, htable); return null; } }); } private void awaitFlushTerminate(List<Put> puts, int retryIntervalUnit, Table table) { int i = 0; log.info("flush to hbase start..."); while (!flushToHBase(puts, table)) { log.info("The {} flush start", i); try { i = i == retryInterval.length ? i - 1 : i; Thread.sleep(retryIntervalUnit * retryInterval[i++]); } catch (InterruptedException e) { log.error("[HBASE] flush thread error : ", e); } } log.info("flush to hbase end, size : {}", puts.size()); } private boolean flushToHBase(List<Put> puts, Table table) { try { log.info("batch size: {}", puts.size()); Object[] results = new Object[puts.size()]; table.batch(puts, results); return true; } catch (IOException | InterruptedException e) { log.error("[HBASE] flush exception : ", e); return false; } } @Override public void delete(String tableName, final String rowName, final String familyName, final String qualifier) { Assert.hasLength(rowName); Assert.hasLength(familyName); execute(tableName, new TableCallback<Object>() { @Override public Object doInTable(HTableInterface htable) throws Throwable { String rowkey = genRowKey(rowName); Delete delete = new Delete(rowkey.getBytes(getCharset())); byte[] family = familyName.getBytes(getCharset()); if (qualifier != null) { delete.deleteColumn(family, qualifier.getBytes(getCharset())); } else { delete.deleteFamily(family); } htable.delete(delete); return null; } }); } /** * rowkey 生成方式 * 如果rowkey的生成算法不通,可以在具体表对应的类中重写算法 * * @param id * @return */ protected String genRowKey(String id) { return id; } public String getTableName() { return tableName; } /** * spring-hbase.xml 配置tableName * * @param tableName */ public void setTableName(String tableName) { this.tableName = tableName; } public String getZkNode() { return zkNode; } public void setZkNode(String zkNode) { this.zkNode = zkNode; getConfiguration().set("zookeeper.znode.parent", zkNode); } }

    自己定义的LeafHbaseTemplate,重写了get方法,主要是rowkey的生成方式修改。增加了put方法,支持传入一个实体,原来的put方法在实际使用中不方便。

    package com.hbase.rowmapper; import com.alibaba.fastjson.JSONObject; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.springframework.data.hadoop.hbase.RowMapper; import java.lang.reflect.Field; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.text.ParseException; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * @created by ycc * @since 2019-05-28 * */ @Slf4j public abstract class LeafRowMapper<T> implements RowMapper { private Gson gson = new Gson(); /** * 通过反射,将Result中的值set到entity中 * * @return * @throws NoSuchFieldException */ public void getValue(Result result, T t) throws NoSuchFieldException, IllegalAccessException, ClassNotFoundException { List<Cell> cells = result.listCells(); for (Cell cell : cells) { String qualifier = Bytes.toString(cell.getQualifier()); String value = Bytes.toString(cell.getValue()); if (StringUtils.isEmpty(value)) { continue; } Field field = t.getClass().getDeclaredField(qualifier); field.setAccessible(true); Class fieldClass = field.getClass(); if (fieldClass.equals(String.class)) { field.set(value, t); } else if (fieldClass.equals(boolean.class) || fieldClass.equals(Boolean.class)) { field.set(Boolean.valueOf(value), t); } else if (fieldClass.equals(long.class) || fieldClass.equals(Long.class)) { field.set(Long.valueOf(value), t); } else if (fieldClass.equals(short.class) || fieldClass.equals(Short.class)) { field.set(Short.valueOf(value), t); } else if (fieldClass.equals(double.class) || fieldClass.equals(Double.class)) { field.set(Double.valueOf(value), t); } else if (fieldClass.equals(float.class) || fieldClass.equals(Float.class)) { field.set(Double.valueOf(value), t); } else if (fieldClass.equals(int.class) || fieldClass.equals(Integer.class)) { field.set(Double.valueOf(value), t); } else if (fieldClass.equals(JSONObject.class)) { field.set(JSONObject.parseObject(value), t); } else if (fieldClass.equals(List.class)) { Type type = field.getGenericType(); if (type instanceof ParameterizedType) { ParameterizedType pType = (ParameterizedType) type; Class<?> argType = (Class<?>) pType.getActualTypeArguments()[0]; field.set(fromSourceList(new Gson().fromJson(value, List.class), Class.forName(argType.getName())), t); } else { field.set(fromSourceList(new Gson().fromJson(value, List.class), field.getType()), t); } } else { field.set(gson.fromJson(value, fieldClass), t); } } } private List fromSourceList(List<Object> properties, Class<?> aClass) { if (properties == null) { return null; } List result = new ArrayList(); for (Object property : properties) { try { if (property instanceof java.util.Map) { result.add(fromSourceMap((Map<String, Object>) property, aClass)); } else { result.add(property); } } catch (Exception e) { log.error(e.getMessage(), e); } } return result; } /** * wrap sourceMap to a Class object specialed * * @param sourceMap es search sourceMap * @param aClass class type * @return a object * @throws IllegalAccessException * @throws InstantiationException * @throws ParseException */ private static <E> E fromSourceMap(Map<String, Object> sourceMap, Class<?> aClass) throws IllegalAccessException, InstantiationException { E t = (E) aClass.newInstance(); Field[] fields = aClass.getDeclaredFields(); if (sourceMap == null) { return t; } for (Field field : fields) { field.setAccessible(true); Class fieldClass = field.getClass(); String fieldName = field.getName(); if (!sourceMap.containsKey(fieldName)) { continue; } String value = String.valueOf(sourceMap.get(fieldName)); if (fieldClass.equals(String.class)) { field.set(value, t); } else if (fieldClass.equals(boolean.class) || fieldClass.equals(Boolean.class)) { field.set(Boolean.valueOf(value), t); } else if (fieldClass.equals(long.class) || fieldClass.equals(Long.class)) { field.set(Long.valueOf(value), t); } else if (fieldClass.equals(short.class) || fieldClass.equals(Short.class)) { field.set(Short.valueOf(value), t); } else if (fieldClass.equals(double.class) || fieldClass.equals(Double.class)) { field.set(Double.valueOf(value), t); } else if (fieldClass.equals(float.class) || fieldClass.equals(Float.class)) { field.set(Double.valueOf(value), t); } else if (fieldClass.equals(int.class) || fieldClass.equals(Integer.class)) { field.set(Double.valueOf(value), t); } else if (fieldClass.equals(JSONObject.class)) { field.set(JSONObject.parseObject(value), t); } } return t; } }

    增加了result到具体实体的解析类–LeafRowMapper。主要封装了转译的函数。

    package com.hbase.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * @created by ycc * @since 2019-05-28 * 具体写入hbase列簇,默认列簇为p */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) public @interface FamilyField { String value(); }

    增加了一个注解类,用于指定某个字段应该写入到哪个列簇。



    spring结合hbase有个坑,每次获取数据都需要重新connection 查看spring-data-hadoop源码。在HbaseUtil.class获取具体的hbaseTable

    public static HTableInterface getHTable(String tableName, Configuration configuration, Charset charset, HTableInterfaceFactory tableFactory) { if (HbaseSynchronizationManager.hasResource(tableName)) { return (HTable) HbaseSynchronizationManager.getResource(tableName); } HTableInterface t = null; try { if (tableFactory != null) { t = tableFactory.createHTableInterface(configuration, tableName.getBytes(charset)); } else { t = new HTable(configuration, tableName.getBytes(charset)); } return t; } catch (Exception ex) { throw convertHbaseException(ex); } }

    开头的判断不是特别了解(之后有空在深入了解),经过调试发现,每次HbaseSynchronizationManager.hasResource(tableName)都为false,这时候会判断tableFactory,tableFactory为null,这个在spring-data-hadoop里面并没有进行实例化。需要在spring-hbase.xml里面实例化。所以每次请求都会new HTable()。 如何解决这个问题?我们可以重写一个SkynetTableFactory继承HTableFactory。具体代码如下:

    package com.hbase; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import java.io.IOException; /** * @created by ycc * @since 2019-05-29 */ @Slf4j public class SkynetHTableFactory extends HTableFactory { private Connection connection = null; private Configuration configuration = null; public SkynetHTableFactory(Configuration configuration) { this.configuration = configuration; } @Override public HTableInterface createHTableInterface(Configuration config, byte[] tableName) { try { return new HTable(TableName.valueOf(tableName), connection); } catch (IOException ioe) { throw new RuntimeException(ioe); } } public void setZkNode(String zkNode) { try { configuration.set("zookeeper.znode.parent", zkNode); connection = HConnectionManager.createConnection(configuration); } catch (IOException e) { log.error(e.getMessage(), e); } } }

    并且修改spring-hbase.xml如下:

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop-2.3.xsd"> <!-- HDFS配置 --> <hdp:configuration id="hadoopConfiguration"> fs.defaultFS="hdfs://localhost:9000" </hdp:configuration> <!-- HBase连接配置 --> <hdp:hbase-configuration id="hbaseConfiguration" delete-connection="false" zk-quorum="hdp3,hdp4,hdp5" zk-port="2181"/> <bean id="tableFactory" class="com.hbase.SkynetHTableFactory" p:zkNode="/hbase-unsecure"> <constructor-arg ref="hbaseConfiguration"/> </bean> <bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate" p:configuration-ref="hbaseConfiguration" p:tableFactory-ref="tableFactory" /> <bean id="leafhbaseTemplate" class="com.hbase.LeafHbaseTemplate" p:configuration-ref="hbaseConfiguration" p:tableFactory-ref="tableFactory" /> </beans>
    最新回复(0)