sprak,spark hbase,saveAsNewAPIHadoopDataset,spark使用saveAsNewAPIHadoopDataset将数据保存到Hbase | AiTi修炼|重剑无锋,拈花微笑
Header
Header

spark使用saveAsNewAPIHadoopDataset将数据保存到Hbase

在spark实际使用过程,经常会遇到需要将运算结果数据写入保存到Hbase中,具体实现思路可以使用saveAsNewAPIHadoopDataset将数据保存到Hbase。


package com.rscala import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.spark._ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes object SparkToHBase { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("SparkToHBase").setMaster("local") val sc = new SparkContext(sparkConf) val tablename = "account" sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3") sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181") sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename) val job = new Job(sc.hadoopConfiguration) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val indataRDD = sc.makeRDD(Array("1,rscal1,15","2,rscal2,16","3,rscal3,16")) val rdd = indataRDD.map(_.split(',')).map{arr=>{ val put = new Put(Bytes.toBytes(arr(0))) put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt)) (new ImmutableBytesWritable, put) }} rdd.saveAsNewAPIHadoopDataset(job.getConfiguration()) } }
ps:对于Spark 如何读取Hbase中存储的数据,可以参考http://rscala.com/index.php/395.html

rscala.com版权所有,本文spark使用saveAsNewAPIHadoopDataset将数据保存到Hbase转载请注明出处:http://rscala.com/index.php/397.html

该文章归档分类于 scala实践, spark实践, 大数据基础

Leave a Reply

电子邮件地址不会被公开。 必填项已用*标注

*

code