Hbase Java API对数据的一些操作使用

    xiaoxiao2023-11-12  118

    代码:

    1.一些依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hbase</groupId> <artifactId>HbaseTest</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>HbaseTest</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.4</version> </dependency> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> </dependencies> </project>

    2.Hbase连接工具类

    package com.hbase.utils; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Table; /** * Hbase连接工具类 * @author linhaiy * @date 2019.05.20 */ public class HBaseConn { private static final HBaseConn INSTANCE = new HBaseConn(); private static Configuration configuration; private static Connection connection; public HBaseConn() { try { if (configuration == null) { configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum", "localhost:2181"); } } catch (Exception e) { e.printStackTrace(); } } public Connection getConnection() { if (connection == null || connection.isClosed()) { try { connection = ConnectionFactory.createConnection(configuration); } catch (Exception e) { e.printStackTrace(); } } return connection; } public static Connection getHBaseConn() { return INSTANCE.getConnection(); } public static Table getTable(String tableName) throws IOException { return INSTANCE.getConnection().getTable(TableName.valueOf(tableName)); } public static void closeConn() { if (connection != null) { try { connection.close(); } catch (IOException ioe) { ioe.printStackTrace(); } } } }

    3.数据操作工具类

    package com.hbase.service; import java.io.IOException; import java.util.Arrays; import java.util.List; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.util.Bytes; import com.hbase.utils.HBaseConn; /** * Hbase操作工具类 * @author linhaiy * @date 2019.05.20 */ public class HBaseService { /** * 创建HBase表. * @param tableName 表名 * @param cfs 列族的数组 * @return 是否创建成功 */ public static boolean createTable(String tableName, String[] cfs) { try (HBaseAdmin admin = (HBaseAdmin) HBaseConn.getHBaseConn().getAdmin()) { if (admin.tableExists(tableName)) { return false; } HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); Arrays.stream(cfs).forEach(cf -> { HColumnDescriptor columnDescriptor = new HColumnDescriptor(cf); columnDescriptor.setMaxVersions(1); tableDescriptor.addFamily(columnDescriptor); }); admin.createTable(tableDescriptor); } catch (Exception e) { e.printStackTrace(); } return true; } /** * 删除hbase表. * @param tableName 表名 * @return 是否删除成功 */ public static boolean deleteTable(String tableName) { try (HBaseAdmin admin = (HBaseAdmin) HBaseConn.getHBaseConn().getAdmin()) { admin.disableTable(tableName); admin.deleteTable(tableName); } catch (Exception e) { e.printStackTrace(); } return true; } /** * hbase插入一条数据. * @param tableName 表名 * @param rowKey 唯一标识 * @param cfName 列族名 * @param qualifier 列标识 * @param data 数据 * @return 是否插入成功 */ public static boolean putRow(String tableName, String rowKey, String cfName, String qualifier, String data) { try (Table table = HBaseConn.getTable(tableName)) { Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes(cfName), Bytes.toBytes(qualifier), Bytes.toBytes(data)); table.put(put); } catch (IOException ioe) { ioe.printStackTrace(); } return true; } /** * 插入多条数据 * @param tableName * @param puts * @return */ public static boolean putRows(String tableName, List<Put> puts) { try (Table table = HBaseConn.getTable(tableName)) { table.put(puts); } catch (IOException ioe) { ioe.printStackTrace(); } return true; } public static Result getRow(String tableName, String rowKey) { try (Table table = HBaseConn.getTable(tableName)) { Get get = new Get(Bytes.toBytes(rowKey)); return table.get(get); } catch (IOException ioe) { ioe.printStackTrace(); } return null; } public static Result getRow(String tableName, String rowKey, FilterList filterList) { try (Table table = HBaseConn.getTable(tableName)) { Get get = new Get(Bytes.toBytes(rowKey)); get.setFilter(filterList); return table.get(get); } catch (IOException ioe) { ioe.printStackTrace(); } return null; } public static ResultScanner getScanner(String tableName) { try (Table table = HBaseConn.getTable(tableName)) { Scan scan = new Scan(); scan.setCaching(1000); return table.getScanner(scan); } catch (IOException ioe) { ioe.printStackTrace(); } return null; } /** * 批量检索数据. * @param tableName 表名 * @param startRowKey 起始RowKey * @param endRowKey 终止RowKey * @return ResultScanner实例 */ public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey) { try (Table table = HBaseConn.getTable(tableName)) { Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes(startRowKey)); scan.setStopRow(Bytes.toBytes(endRowKey)); scan.setCaching(1000); return table.getScanner(scan); } catch (IOException ioe) { ioe.printStackTrace(); } return null; } public static ResultScanner getScanner(String tableName, String startRowKey, String endRowKey, FilterList filterList) { try (Table table = HBaseConn.getTable(tableName)) { Scan scan = new Scan(); scan.setStartRow(Bytes.toBytes(startRowKey)); scan.setStopRow(Bytes.toBytes(endRowKey)); scan.setFilter(filterList); scan.setCaching(1000); return table.getScanner(scan); } catch (IOException ioe) { ioe.printStackTrace(); } return null; } /** * HBase删除一行记录. * @param tableName 表名 * @param rowKey 唯一标识 * @return 是否删除成功 */ public static boolean deleteRow(String tableName, String rowKey) { try (Table table = HBaseConn.getTable(tableName)) { Delete delete = new Delete(Bytes.toBytes(rowKey)); table.delete(delete); } catch (IOException ioe) { ioe.printStackTrace(); } return true; } /** * 删除指定列簇 * @param tableName * @param cfName * @return */ public static boolean deleteColumnFamily(String tableName, String cfName) { try (HBaseAdmin admin = (HBaseAdmin) HBaseConn.getHBaseConn().getAdmin()) { admin.deleteColumn(tableName, cfName); } catch (Exception e) { e.printStackTrace(); } return true; } public static boolean deleteQualifier(String tableName, String rowKey, String cfName, String qualifier) { try (Table table = HBaseConn.getTable(tableName)) { Delete delete = new Delete(Bytes.toBytes(rowKey)); delete.addColumn(Bytes.toBytes(cfName), Bytes.toBytes(qualifier)); table.delete(delete); } catch (IOException ioe) { ioe.printStackTrace(); } return true; } }

    4.测试

    package com.hbase.test; import java.io.IOException; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; import org.junit.Test; import com.hbase.utils.HBaseConn; /** * Hbase连接测试 * @author linhaiy * @date 2019.05.20 */ public class HBaseConnTest { @Test public void getConnTest() { Connection conn = HBaseConn.getHBaseConn(); System.out.println(conn.isClosed()); HBaseConn.closeConn(); System.out.println(conn.isClosed()); } @Test public void getTableTest() { try { Table table = HBaseConn.getTable("US_POPULATION"); System.out.println(table.getName().getNameAsString()); table.close(); } catch (IOException ioe) { ioe.printStackTrace(); } } } package com.hbase.test; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import com.hbase.service.HBaseService; /** * Hbase的一些基础操作测试 * @author linhaiy * @date 2019.05.20 */ public class HbaseTest { @Test public void createTable() { HBaseService.createTable("FileTable", new String[] { "fileInfo", "saveInfo" }); } @Test public void addFileDetails() { HBaseService.putRow("FileTable", "rowkey1", "fileInfo", "name", "file1.txt"); HBaseService.putRow("FileTable", "rowkey1", "fileInfo", "type", "txt"); HBaseService.putRow("FileTable", "rowkey1", "fileInfo", "size", "1024"); HBaseService.putRow("FileTable", "rowkey1", "saveInfo", "creator", "jixin"); HBaseService.putRow("FileTable", "rowkey2", "fileInfo", "name", "file2.jpg"); HBaseService.putRow("FileTable", "rowkey2", "fileInfo", "type", "jpg"); HBaseService.putRow("FileTable", "rowkey2", "fileInfo", "size", "1024"); HBaseService.putRow("FileTable", "rowkey2", "saveInfo", "creator", "jixin"); } @Test public void getFileDetails() { Result result = HBaseService.getRow("FileTable", "rowkey1"); if (result != null) { System.out.println("rowkey=" + Bytes.toString(result.getRow())); System.out.println( "fileName=" + Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name")))); } } @Test public void scanFileDetails() { ResultScanner scanner = HBaseService.getScanner("FileTable", "rowkey2", "rowkey2"); if (scanner != null) { scanner.forEach(result -> { System.out.println("rowkey=" + Bytes.toString(result.getRow())); System.out.println("fileName=" + Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name")))); }); scanner.close(); } } @Test public void deleteRow() { HBaseService.deleteRow("FileTable", "rowkey1"); } @Test public void deleteTable() { HBaseService.deleteTable("FileTable"); } } package com.hbase.test; import java.util.Arrays; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.FilterList.Operator; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; import com.hbase.service.HBaseService; /** * HBaseFilter的测试类 * @author linhaiy * @date 2019.05.18 */ public class HBaseFilterTest { /** * 建表 */ @Test public void createTable() { HBaseService.createTable("File", new String[] { "fileInfo", "saveInfo" }); } /** * 批量插入单条数据 */ @Test public void addFileDetails() { HBaseService.putRow("File", "rowkey1", "fileInfo", "name", "file1.txt"); HBaseService.putRow("File", "rowkey1", "fileInfo", "type", "txt"); HBaseService.putRow("File", "rowkey1", "fileInfo", "size", "1024"); HBaseService.putRow("File", "rowkey1", "saveInfo", "creator", "jixin"); HBaseService.putRow("File", "rowkey2", "fileInfo", "name", "file2.jpg"); HBaseService.putRow("File", "rowkey2", "fileInfo", "type", "jpg"); HBaseService.putRow("File", "rowkey2", "fileInfo", "size", "1024"); HBaseService.putRow("File", "rowkey2", "saveInfo", "creator", "jixin"); HBaseService.putRow("File", "rowkey3", "fileInfo", "name", "file3.jpg"); HBaseService.putRow("File", "rowkey3", "fileInfo", "type", "jpg"); HBaseService.putRow("File", "rowkey3", "fileInfo", "size", "1024"); HBaseService.putRow("File", "rowkey3", "saveInfo", "creator", "jixin"); } /** * 过滤查询 */ @Test public void rowFilterTest() { Filter filter = new RowFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("rowkey1"))); //RowFilter行键过滤器 ,BinaryComparator 匹配完整字节数组 列族为rowkey1 //"与" Operator.Operator.MUST_PASS_ALL 和 "或" Operator.MUST_PASS_ONE FilterList filterList = new FilterList(Operator.MUST_PASS_ONE, Arrays.asList(filter)); //FilterList 代表一个过滤器链,它可以包含一组即将应用于目标数据集的过滤器 ResultScanner scanner = HBaseService.getScanner("File", "rowkey1", "rowkey3", filterList); if (scanner != null) { scanner.forEach(result -> { System.out.println("rowkey=" + Bytes.toString(result.getRow())); System.out.println("fileName=" + Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name")))); }); scanner.close(); } } /** * 前缀过滤查询 */ @Test public void prefixFilterTest() { Filter filter = new PrefixFilter(Bytes.toBytes("rowkey2")); FilterList filterList = new FilterList(Operator.MUST_PASS_ALL, Arrays.asList(filter)); ResultScanner scanner = HBaseService.getScanner("File", "rowkey1", "rowkey3", filterList); if (scanner != null) { scanner.forEach(result -> { System.out.println("rowkey=" + Bytes.toString(result.getRow())); System.out.println("fileName=" + Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name")))); }); scanner.close(); } } /** * 只查询每行键值对中有 "键" 元数据信息,不显示值 */ @Test public void keyOnlyFilterTest() { Filter filter = new KeyOnlyFilter(true); FilterList filterList = new FilterList(Operator.MUST_PASS_ALL, Arrays.asList(filter)); ResultScanner scanner = HBaseService.getScanner("File", "rowkey1", "rowkey3", filterList); if (scanner != null) { scanner.forEach(result -> { System.out.println("rowkey=" + Bytes.toString(result.getRow())); System.out.println("fileName=" + Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name")))); }); scanner.close(); } } /** * 列名的前缀来筛选单元格 */ @Test public void columnPrefixFilterTest() { Filter filter = new ColumnPrefixFilter(Bytes.toBytes("nam")); FilterList filterList = new FilterList(Operator.MUST_PASS_ALL, Arrays.asList(filter)); ResultScanner scanner = HBaseService.getScanner("File", "rowkey1", "rowkey3", filterList); if (scanner != null) { scanner.forEach(result -> { System.out.println("rowkey=" + Bytes.toString(result.getRow())); System.out.println("fileName=" + Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("name")))); System.out.println("fileType=" + Bytes.toString(result.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("type")))); }); scanner.close(); } } }

     

    最新回复(0)