kafka sparkStreaming hbase mysql 整合案例

    xiaoxiao2025-07-31  14

    通过flume向kafka的topic发送数据,sparkstreaming去消费topic中的数据清洗处理,最后存储到hbase中

    flume sink.conf

    待续

    通过命令行提前创建一个topic

    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

    接下来就是一些代码了

    首先pom文件

     

        <properties>         <spark.version>2.4.0</spark.version>         <scala.version>2.11</scala.version>         <hbase.version>1.2.1</hbase.version>     </properties>     <dependencies>         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-core_${scala.version}</artifactId>             <version>${spark.version}</version>         </dependency>         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>             <version>2.4.0</version>         </dependency>         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-streaming_${scala.version}</artifactId>             <version>${spark.version}</version>         </dependency>         <dependency>             <groupId>com.crealytics</groupId>             <artifactId>spark-excel_2.11</artifactId>             <version>0.11.0</version>         </dependency>         <dependency>             <groupId>org.apache.hadoop</groupId>             <artifactId>hadoop-common</artifactId>             <version>2.8.3</version>         </dependency>         <dependency>             <groupId>org.apache.hbase</groupId>             <artifactId>hbase</artifactId>             <version>${hbase.version}</version>         </dependency>         <dependency>             <groupId>org.apache.hbase</groupId>             <artifactId>hbase-client</artifactId>             <version>${hbase.version}</version>         </dependency>         <dependency>             <groupId>org.apache.hbase</groupId>             <artifactId>hbase-server</artifactId>             <version>${hbase.version}</version>         </dependency>         <dependency>             <groupId>org.apache.hbase</groupId>             <artifactId>hbase-common</artifactId>             <version>${hbase.version}</version>         </dependency>         <dependency>             <groupId>org.apache.zookeeper</groupId>             <artifactId>zookeeper</artifactId>             <version>3.4.12</version>         </dependency>         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-sql_${scala.version}</artifactId>             <version>${spark.version}</version>         </dependency>         <dependency>             <groupId>org.apache.spark</groupId>             <artifactId>spark-hive_${scala.version}</artifactId>             <version>${spark.version}</version>         </dependency>         <dependency>             <groupId>mysql</groupId>             <artifactId>mysql-connector-java</artifactId>             <version>5.1.47</version>         </dependency>     </dependencies>     <build>         <plugins>             <plugin>                 <groupId>org.scala-tools</groupId>                 <artifactId>maven-scala-plugin</artifactId>                 <version>2.15.2</version>             </plugin>             <plugin>                 <artifactId>maven-compiler-plugin</artifactId>                 <version>3.6.0</version>                 <configuration>                     <source>1.8</source>                     <target>1.8</target>                 </configuration>             </plugin>             <plugin>                 <groupId>org.apache.maven.plugins</groupId>                 <artifactId>maven-surefire-plugin</artifactId>                 <version>2.19</version>             </plugin>         </plugins>     </build>  

     

    import org.apache.hadoop.hbase.TableName import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext}

    object kafkaSparkHbase {

     

      def main(args: Array[String]): Unit = {     val conf = new SparkConf().setMaster("local[*]").setAppName("kafk")     val ssc = new StreamingContext(conf, Seconds(2))

        val kafkaParams = Map[String, Object](       "bootstrap.servers" -> "192.168.2.33:9092,192.168.3.38.12:9092,192.168.2.64:9092",       "key.deserializer" -> classOf[StringDeserializer],       "value.deserializer" -> classOf[StringDeserializer],       "group.id" -> "kgv",       "auto.offset.reset" -> "latest",       "enable.auto.commit" -> (false: java.lang.Boolean)     )

        val topics = Array("st")     val stream = KafkaUtils.createDirectStream[String, String](       ssc,       PreferConsistent,       Subscribe[String, String](topics, kafkaParams)     )

        val dataStream = stream.map(record => record.value).map(_.split(",")).filter

        stream.map(record=>record.value).print()

        dataStream.foreachRDD(rdd => {       rdd.foreachPartition(partitionRecords => {         val conn = HBaseUtil.getHbaseConn         partitionRecords.foreach(data => {           val tableName = TableName.valueOf("test")           val t = conn.getTable(tableName)

    //hbase 创建表 create 'tableName','列族1','列族2'           // rowkey 字典序排列

    // hbase插入 put rowkey 列族 列名 value           val put = new Put(Bytes.toBytes(data(1)))

              //列族 列 数据

              //列族一 cf1           put.addColumn("cf1".getBytes(), "n".getBytes(), data(0).getBytes())           put.addColumn("cf1".getBytes(), "Status".getBytes(), data(2).getBytes())

              //列族二 cf2           put.addColumn("cf2".getBytes(), "Status".getBytes(), data(3).getBytes())           put.addColumn("cf2".getBytes(), "Model".getBytes(), data(4).getBytes())

              t.put(put)         })       })     })

        ssc.start()     ssc.awaitTermination()

      } }  

     

    import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory} import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}

    object HBaseUtil extends Serializable {   private val conf = HBaseConfiguration.create()   conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,"2181") //  conf.set(HConstants.ZOOKEEPER_QUORUM,",,,")   conf.set(HConstants.ZOOKEEPER_QUORUM,"h1,h2,h3")   private val connection = ConnectionFactory.createConnection(conf)

      def getHbaseConn: Connection = connection }

     

    重点记录在连接Hbase过程中配置HConstants.ZOOKEEPER_QUORUM的问题:

    由于Hbase的连接不能直接使用ip地址进行访问,往往需要配置hosts,

    但这样是一条一条的往hbase里面插入的,效率有点慢

    可采用bulk load 批量插入的方式

     

     

     

    Spark访问Mysql 同访问Hbase类似,我们也需要有一个可序列化的类来建立Mysql连接,这里我们利用了Mysql的C3P0连接池

    MySQL通用连接类 import java.sql.Connection import java.util.Propertie

    import com.mchange.v2.c3p0.ComboPooledDataSource

    class MysqlPool extends Serializable { private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true) private val conf = Conf.mysqlConfig try { cpds.setJdbcUrl(conf.get("url").getOrElse("jdbc:mysql://127.0.0.1:3306/test_bee?useUnicode=true&characterEncoding=UTF-8")); cpds.setDriverClass("com.mysql.jdbc.Driver"); cpds.setUser(conf.get("username").getOrElse("root")); cpds.setPassword(conf.get("password").getOrElse("")) cpds.setMaxPoolSize(200) cpds.setMinPoolSize(20) cpds.setAcquireIncrement(5) cpds.setMaxStatements(180) } catch { case e: Exception => e.printStackTrace() } def getConnection: Connection = { try { return cpds.getConnection(); } catch { case ex: Exception => ex.printStackTrace() null } } } object MysqlManager { var mysqlManager: MysqlPool = _ def getMysqlManager: MysqlPool = { synchronized { if (mysqlManager == null) { mysqlManager = new MysqlPool } } mysqlManager } } 我们利用c3p0建立Mysql连接池,然后访问的时候每次从连接池中取出连接用于数据传输。

    Mysql输出操作 同样利用之前的foreachRDD设计模式,将Dstream输出到mysql的代码如下:

    dstream.foreachRDD(rdd => { if (!rdd.isEmpty) { rdd.foreachPartition(partitionRecords => { //从连接池中获取一个连接 val conn = MysqlManager.getMysqlManager.getConnection val statement = conn.createStatement try { conn.setAutoCommit(false) partitionRecords.foreach(record => { val sql = "insert into table..." // 需要执行的sql操作 statement.addBatch(sql) }) statement.executeBatch conn.commit } catch { case e: Exception => // do some log } finally { statement.close() conn.close() } }) } }) 值得注意的是:

    我们在提交Mysql的操作的时候,并不是每条记录提交一次,而是采用了批量提交的形式,所以需要将conn.setAutoCommit(false),这样可以进一步提高mysql的效率。 如果我们更新Mysql中带索引的字段时,会导致更新速度较慢,这种情况应想办法避免,如果不可避免,那就硬上吧(T^T) 部署

    https://cloud.tencent.com/developer/article/1004820

     

    //spark hbase new api

    hbase 的CURD操作

    新版api加入了connection,HAdmin变成了Admin,HTable变成了Table,

    val  conf=HBaseConfiguration.create()

    conf.set("hbase.zookeeper.property.clientPort", "2181")

    conf.set("hbase.zookeeper.quorum", "master")

    //connection的创建是个重量级的工作,线程安全,是操作hbase的入口

    val conn=ConnectionFactory.createConnection(conf)

    //创建表 使用admin

    val userTable=TableName.valueOf("user")

    val tableDescr=new HTableDescriptor(userTable)

    //创建列族

    tableDescr.addFamily(new HColumnDescriptor("basic".getBytes))

    if(admin.tableExists(userTable)){

    admin.disableTable(userTable)

    admin.delete(userTable)

    }

    admin.createTable(tableDescr)

     

    //curd hbase上的操作都需要先创建一个操作对象put get delete 等,然后调用Table撒谎给你`上的对应方法

    val table=conn.getTable(userTable)

    //rowkey

    val p=new put("value".getBytes)

    p.addColumn(cf.getBytes,cl.getBytes,value.getBytes)

    table.put(p)

    //查询

    val g=new Get(rowkey.getBytes)

    val result=table.get(g)

    val value=Bytes.toString(result.getValue(cf.getBytes,col.getBytes))

    println(value)

    //扫描数据

    val s=New Scan()

    s.addColumn(cf.getBytes,col.getBytes)

    val scanner=table.getScanner(s)

    for (r <- scanner){

    println("Found row:"+r)

    println("Found value:"+Bytes.toString(r.getValue(cf.getBytes,col.getBytes)))

    }

    }finally{

    scanner.close()

    }

     

    //删除某个数据

    val d=new Delete(rowkey.getBytes)

    d.addColumn(cf.getBytes,col.getBytes)

    table.delete(d)

    }finall y{

    if(table!=null) table.colse()

    }

    }finally{

    conn.close()

    }

     

    //写入到hbase

    //定义Hbase的配置

    val conf =HBaseConfiguration.create()

    conf.set("hbase.zookeeper.property.clientPort","2181")

    conf.set("hbase.zookeeper.quorum:,"hosts")

    //指定输出格式和输出表名

    val jobConf=new JobConf(conf,this.getclass)

    jobconf.setOutputFormat(classof[TableOutputFormat])

    jobConf.set(TableOutputFormat.OUTPUT_TABLE,"user")

    //rdd到表模式的映射

    在hbase中表schema 一般是

    row cf:clo1 cf:col2

    而在Spark中,我们操作的是RDD元组,比如(1,"lilei",14), (2,"hanmei",18)。我们需要将 RDD[(uid:Int, name:String, age:Int)] 转换成 RDD[(ImmutableBytesWritable, Put)]。所以,我们定义一个 convert 函数做这个转换工作

    def convert(triple: (Int, String, Int)) = { val p = new Put(Bytes.toBytes(triple._1)) p.addColumn(Bytes.toBytes("basic"),Bytes.toBytes("name"),Bytes.toBytes(triple._2)) p.addColumn(Bytes.toBytes("basic"),Bytes.toBytes("age"),Bytes.toBytes(triple._3)) (new ImmutableBytesWritable, p) }

     

    读取rdd并转换

    //read RDD data from somewhere and convert val rawData = List((1,"lilei",14), (2,"hanmei",18), (3,"someone",38)) val localData = sc.parallelize(rawData).map(convert)

     

    Step 4: 使用saveAsHadoopDataset方法写入HBase

    localData.saveAsHadoopDataset(jobConf)

     

    读取 HBase

    Spark读取HBase,我们主要使用SparkContext 提供的newAPIHadoopRDDAPI将表的内容以 RDDs 的形式加载到 Spark 中。

    val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("hbase.zookeeper.quorum", "master") //设置查询的表名 conf.set(TableInputFormat.INPUT_TABLE, "user") val usersRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val count = usersRDD.count() println("Users RDD Count:" + count) usersRDD.cache() //遍历输出 usersRDD.foreach{ case (_,result) => val key = Bytes.toInt(result.getRow) val name = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes)) val age = Bytes.toInt(result.getValue("basic".getBytes,"age".getBytes)) println("Row key:"+key+" Name:"+name+" Age:"+age) }

    http://wuchong.me/blog/2015/04/06/spark-on-hbase-new-api/

     

     

     

    Spark读写HBase之使用Spark自带的API以及使用Bulk Load将大量数据导入HBase

    <properties> <spark.version>2.3.0</spark.version> <hbase.version>1.2.6</hbase.version> <scala.main.version>2.11</scala.main.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.main.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>${hbase.version}</version> </dependency> <!-- 本文处理数据用到的解析json字符串的jar包,非必需 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> </dependencies>

    (1) 使用saveAsNewAPIHadoopDataset()

    package com.bonc.rdpe.spark.hbase import com.alibaba.fastjson.JSON import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase._ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} object WriteHBaseWithNewHadoopAPI { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local") val sc = new SparkContext(sparkConf) val input = sc.textFile("file:///D:/data/news_profile_data.txt") val hbaseConf = HBaseConfiguration.create() hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "172.16.13.185:2181") val hbaseConn = ConnectionFactory.createConnection(hbaseConf) val admin = hbaseConn.getAdmin val jobConf = new JobConf(hbaseConf, this.getClass) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "news") if (!admin.tableExists(TableName.valueOf("news"))) { val desc = new HTableDescriptor(TableName.valueOf("news")) val hcd = new HColumnDescriptor("cf1") desc.addFamily(hcd) admin.createTable(desc) } val job = Job.getInstance(jobConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val data = input.map(jsonStr => { val jsonObject = JSON.parseObject(jsonStr) val newsId = jsonObject.get("id").toString.trim val title = jsonObject.get("title").toString.trim val put = new Put(Bytes.toBytes(newsId)) put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("title"), Bytes.toBytes(title)) (new ImmutableBytesWritable, put) }) data.saveAsNewAPIHadoopDataset(job.getConfiguration) sc.stop() } }

    (2) 使用saveAsHadoopDataset()

    package com.bonc.rdpe.spark.hbase import com.alibaba.fastjson.JSON import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client.{ConnectionFactory, Put} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf import org.apache.spark.{SparkConf, SparkContext} object WriteHBaseWithOldHadoopAPI { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local") val sc = new SparkContext(sparkConf) val input = sc.textFile("file:///D:/data/news_profile_data.txt") val hbaseConf = HBaseConfiguration.create() hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "172.16.13.185:2181") val hbaseConn = ConnectionFactory.createConnection(hbaseConf) val admin = hbaseConn.getAdmin val jobConf = new JobConf(hbaseConf, this.getClass) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "news") jobConf.setOutputFormat(classOf[TableOutputFormat]) if (!admin.tableExists(TableName.valueOf("news"))) { val desc = new HTableDescriptor(TableName.valueOf("news")) val hcd = new HColumnDescriptor("cf1") desc.addFamily(hcd) admin.createTable(desc) } val data = input.map(jsonStr => { val jsonObject = JSON.parseObject(jsonStr) val newsId = jsonObject.get("id").toString.trim val title = jsonObject.get("title").toString.trim val put = new Put(Bytes.toBytes(newsId)) put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("title"), Bytes.toBytes(title)) (new ImmutableBytesWritable, put) }) data.saveAsHadoopDataset(jobConf) sc.stop() } }

    以上两个算子分别是基于Hadoop新版API和hadoop旧版API实现的,大部分代码都一样,需要注意的是新版API使用中Job类,旧版API使用JobConf类,另外导包的时候新版的相关jar包在org.apache.hadoop.mapreduce下,而旧版的相关jar包在org.apache.hadoop.mapred下

    以下代码使用newAPIHadoopRDD()算子

    package com.bonc.rdpe.spark.hbase import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration} import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.{SparkConf, SparkContext} import scala.collection.JavaConversions._ object ReadHBase { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local").setAppName(s"${this.getClass.getSimpleName}") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.registerKryoClasses(Array(classOf[ImmutableBytesWritable])) val sc = new SparkContext(sparkConf) val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "172.16.13.185:2181") hbaseConf.set(TableInputFormat.INPUT_TABLE, "news") val hBaseRDD = sc.newAPIHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) hBaseRDD.take(10).foreach(tuple => { val result = tuple._2 printResult(result) }) } def printResult(result: Result): Unit = { val cells = result.listCells for (cell <- cells) { printCell(cell) } } def printCell(cell: Cell): Unit = { val str = s"rowkey: ${Bytes.toString(CellUtil.cloneRow(cell))}, family:${Bytes.toString(CellUtil.cloneFamily(cell))}, " + s"qualifier:${Bytes.toString(CellUtil.cloneQualifier(cell))}, value:${Bytes.toString(CellUtil.cloneValue(cell))}, " + s"timestamp:${cell.getTimestamp}" println(str) } }

    需要注意的是,代码中对ImmutableBytesWritable这个类进行了序列化:

    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.registerKryoClasses(Array(classOf[ImmutableBytesWritable]))

    否则程序就会报错:

    java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable

    以上写数据的过程将数据一条条插入到Hbase中,这种方式运行慢且在导入的过程的占用Region资源导致效率低下,所以很不适合一次性导入大量数据,解决办法就是使用 Bulk Load 方式批量导入数据。

    Bulk Load 方式由于利用了 HBase 的数据信息是按照特定格式存储在 HDFS 里的这一特性,直接在 HDFS 中生成持久化的 HFile 数据格式文件,然后完成巨量数据快速入库的操作,配合 MapReduce 完成这样的操作,不占用 Region 资源,不会产生巨量的写入 I/O,所以需要较少的 CPU 和网络资源。

    Bulk Load 的实现原理是通过一个 MapReduce Job 来实现的,通过 Job 直接生成一个 HBase 的内部 HFile 格式文件,用来形成一个特殊的 HBase 数据表,然后直接将数据文件加载到运行的集群中。与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。

    接下来介绍在spark中如何使用 Bulk Load 方式批量导入数据到 HBase 中。

    package com.bonc.rdpe.spark.hbase import com.alibaba.fastjson.JSON import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client.ConnectionFactory import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} object BulkLoad { val zookeeperQuorum = "172.16.13.185:2181" val dataSourcePath = "file:///D:/data/news_profile_data.txt" val hdfsRootPath = "hdfs://beh/" val hFilePath = "hdfs://beh/test/yyh/hbase/bulkload/hfile/" val tableName = "news" val familyName = "cf1" val qualifierName = "title" def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local") val sc = new SparkContext(sparkConf) val hadoopConf = new Configuration() hadoopConf.set("fs.defaultFS", hdfsRootPath) val fileSystem = FileSystem.get(hadoopConf) val hbaseConf = HBaseConfiguration.create(hadoopConf) hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum) hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) val hbaseConn = ConnectionFactory.createConnection(hbaseConf) val admin = hbaseConn.getAdmin if (!admin.tableExists(TableName.valueOf(tableName))) { val desc = new HTableDescriptor(TableName.valueOf(tableName)) val hcd = new HColumnDescriptor(familyName) desc.addFamily(hcd) admin.createTable(desc) } if(fileSystem.exists(new Path(hFilePath))) { fileSystem.delete(new Path(hFilePath), true) } val data = sc.textFile(dataSourcePath) .map(jsonStr => { val jsonObject = JSON.parseObject(jsonStr) val rowkey = jsonObject.get("id").toString.trim val title = jsonObject.get("title").toString.trim (rowkey, title) }) .sortByKey() .map(tuple => { val kv = new KeyValue(Bytes.toBytes(tuple._1), Bytes.toBytes(familyName), Bytes.toBytes(qualifierName), Bytes.toBytes(tuple._2)) (new ImmutableBytesWritable(Bytes.toBytes(tuple._1)), kv) }) val table = hbaseConn.getTable(TableName.valueOf(tableName)) val job = Job.getInstance(hbaseConf) job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) job.setMapOutputValueClass(classOf[KeyValue]) HFileOutputFormat2.configureIncrementalLoadMap(job, table) data.saveAsNewAPIHadoopFile( hFilePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], hbaseConf ) val bulkLoader = new LoadIncrementalHFiles(hbaseConf) val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName)) bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator) hbaseConn.close() fileSystem.close() sc.stop() } }

    说明:

    rowkey一定要进行排序上面的代码使用了saveAsNewAPIHadoopFile(),也可以使用saveAsNewAPIHadoopDataset(),把以下代码: data.saveAsNewAPIHadoopFile( hFilePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], hbaseConf )

    替换为:

    job.getConfiguration.set("mapred.output.dir", hFilePath) data.saveAsNewAPIHadoopDataset(job.getConfiguration)

    即可。

    https://cloud.tencent.com/developer/article/1336561

    最新回复(0)