最近重构了一波代码,使用spring管理hbase实例。
自己定义的LeafHbaseTemplate,重写了get方法,主要是rowkey的生成方式修改。增加了put方法,支持传入一个实体,原来的put方法在实际使用中不方便。
package com.hbase.rowmapper; import com.alibaba.fastjson.JSONObject; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.springframework.data.hadoop.hbase.RowMapper; import java.lang.reflect.Field; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.text.ParseException; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * @created by ycc * @since 2019-05-28 * */ @Slf4j public abstract class LeafRowMapper<T> implements RowMapper { private Gson gson = new Gson(); /** * 通过反射,将Result中的值set到entity中 * * @return * @throws NoSuchFieldException */ public void getValue(Result result, T t) throws NoSuchFieldException, IllegalAccessException, ClassNotFoundException { List<Cell> cells = result.listCells(); for (Cell cell : cells) { String qualifier = Bytes.toString(cell.getQualifier()); String value = Bytes.toString(cell.getValue()); if (StringUtils.isEmpty(value)) { continue; } Field field = t.getClass().getDeclaredField(qualifier); field.setAccessible(true); Class fieldClass = field.getClass(); if (fieldClass.equals(String.class)) { field.set(value, t); } else if (fieldClass.equals(boolean.class) || fieldClass.equals(Boolean.class)) { field.set(Boolean.valueOf(value), t); } else if (fieldClass.equals(long.class) || fieldClass.equals(Long.class)) { field.set(Long.valueOf(value), t); } else if (fieldClass.equals(short.class) || fieldClass.equals(Short.class)) { field.set(Short.valueOf(value), t); } else if (fieldClass.equals(double.class) || fieldClass.equals(Double.class)) { field.set(Double.valueOf(value), t); } else if (fieldClass.equals(float.class) || fieldClass.equals(Float.class)) { field.set(Double.valueOf(value), t); } else if (fieldClass.equals(int.class) || fieldClass.equals(Integer.class)) { field.set(Double.valueOf(value), t); } else if (fieldClass.equals(JSONObject.class)) { field.set(JSONObject.parseObject(value), t); } else if (fieldClass.equals(List.class)) { Type type = field.getGenericType(); if (type instanceof ParameterizedType) { ParameterizedType pType = (ParameterizedType) type; Class<?> argType = (Class<?>) pType.getActualTypeArguments()[0]; field.set(fromSourceList(new Gson().fromJson(value, List.class), Class.forName(argType.getName())), t); } else { field.set(fromSourceList(new Gson().fromJson(value, List.class), field.getType()), t); } } else { field.set(gson.fromJson(value, fieldClass), t); } } } private List fromSourceList(List<Object> properties, Class<?> aClass) { if (properties == null) { return null; } List result = new ArrayList(); for (Object property : properties) { try { if (property instanceof java.util.Map) { result.add(fromSourceMap((Map<String, Object>) property, aClass)); } else { result.add(property); } } catch (Exception e) { log.error(e.getMessage(), e); } } return result; } /** * wrap sourceMap to a Class object specialed * * @param sourceMap es search sourceMap * @param aClass class type * @return a object * @throws IllegalAccessException * @throws InstantiationException * @throws ParseException */ private static <E> E fromSourceMap(Map<String, Object> sourceMap, Class<?> aClass) throws IllegalAccessException, InstantiationException { E t = (E) aClass.newInstance(); Field[] fields = aClass.getDeclaredFields(); if (sourceMap == null) { return t; } for (Field field : fields) { field.setAccessible(true); Class fieldClass = field.getClass(); String fieldName = field.getName(); if (!sourceMap.containsKey(fieldName)) { continue; } String value = String.valueOf(sourceMap.get(fieldName)); if (fieldClass.equals(String.class)) { field.set(value, t); } else if (fieldClass.equals(boolean.class) || fieldClass.equals(Boolean.class)) { field.set(Boolean.valueOf(value), t); } else if (fieldClass.equals(long.class) || fieldClass.equals(Long.class)) { field.set(Long.valueOf(value), t); } else if (fieldClass.equals(short.class) || fieldClass.equals(Short.class)) { field.set(Short.valueOf(value), t); } else if (fieldClass.equals(double.class) || fieldClass.equals(Double.class)) { field.set(Double.valueOf(value), t); } else if (fieldClass.equals(float.class) || fieldClass.equals(Float.class)) { field.set(Double.valueOf(value), t); } else if (fieldClass.equals(int.class) || fieldClass.equals(Integer.class)) { field.set(Double.valueOf(value), t); } else if (fieldClass.equals(JSONObject.class)) { field.set(JSONObject.parseObject(value), t); } } return t; } }增加了result到具体实体的解析类–LeafRowMapper。主要封装了转译的函数。
package com.hbase.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * @created by ycc * @since 2019-05-28 * 具体写入hbase列簇,默认列簇为p */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) public @interface FamilyField { String value(); }增加了一个注解类,用于指定某个字段应该写入到哪个列簇。
spring结合hbase有个坑,每次获取数据都需要重新connection 查看spring-data-hadoop源码。在HbaseUtil.class获取具体的hbaseTable
public static HTableInterface getHTable(String tableName, Configuration configuration, Charset charset, HTableInterfaceFactory tableFactory) { if (HbaseSynchronizationManager.hasResource(tableName)) { return (HTable) HbaseSynchronizationManager.getResource(tableName); } HTableInterface t = null; try { if (tableFactory != null) { t = tableFactory.createHTableInterface(configuration, tableName.getBytes(charset)); } else { t = new HTable(configuration, tableName.getBytes(charset)); } return t; } catch (Exception ex) { throw convertHbaseException(ex); } }开头的判断不是特别了解(之后有空在深入了解),经过调试发现,每次HbaseSynchronizationManager.hasResource(tableName)都为false,这时候会判断tableFactory,tableFactory为null,这个在spring-data-hadoop里面并没有进行实例化。需要在spring-hbase.xml里面实例化。所以每次请求都会new HTable()。 如何解决这个问题?我们可以重写一个SkynetTableFactory继承HTableFactory。具体代码如下:
package com.hbase; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import java.io.IOException; /** * @created by ycc * @since 2019-05-29 */ @Slf4j public class SkynetHTableFactory extends HTableFactory { private Connection connection = null; private Configuration configuration = null; public SkynetHTableFactory(Configuration configuration) { this.configuration = configuration; } @Override public HTableInterface createHTableInterface(Configuration config, byte[] tableName) { try { return new HTable(TableName.valueOf(tableName), connection); } catch (IOException ioe) { throw new RuntimeException(ioe); } } public void setZkNode(String zkNode) { try { configuration.set("zookeeper.znode.parent", zkNode); connection = HConnectionManager.createConnection(configuration); } catch (IOException e) { log.error(e.getMessage(), e); } } }并且修改spring-hbase.xml如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:hdp="http://www.springframework.org/schema/hadoop" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop-2.3.xsd"> <!-- HDFS配置 --> <hdp:configuration id="hadoopConfiguration"> fs.defaultFS="hdfs://localhost:9000" </hdp:configuration> <!-- HBase连接配置 --> <hdp:hbase-configuration id="hbaseConfiguration" delete-connection="false" zk-quorum="hdp3,hdp4,hdp5" zk-port="2181"/> <bean id="tableFactory" class="com.hbase.SkynetHTableFactory" p:zkNode="/hbase-unsecure"> <constructor-arg ref="hbaseConfiguration"/> </bean> <bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate" p:configuration-ref="hbaseConfiguration" p:tableFactory-ref="tableFactory" /> <bean id="leafhbaseTemplate" class="com.hbase.LeafHbaseTemplate" p:configuration-ref="hbaseConfiguration" p:tableFactory-ref="tableFactory" /> </beans>