spark hbase,spark,hbase,spark 如何读取Hbase中存储的数据(scala代码实现) | AiTi修炼|重剑无锋,拈花微笑
Header
Header

spark 如何读取Hbase中存储的数据(scala代码实现)

spark可以在hbase中读取数据并转化为RDD,实现对Hbase中的数据进行计算。但需要注意的是一般生产环境中Hbase是基于zookeeper的高可用集群,因此在编写代码的时候需要保障spark能连接到zookeeper集群,然后借助zookeeper访问hbase。

spark连接到zookeeper有两种方法,分别是:

第一种是将hbase-site.xml文件加入classpath

第二种是在HBaseConfiguration实例中设置

ps:如果不设置,默认连接的是localhost:2181会报错:connection refused 

代码实现示例如下:
package com.test

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.spark._
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io._

object HBaseToSaprk {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("HBaseToSpark").setMaster("local")
    val sc = new SparkContext(sparkConf)

    val tablename = "rscala_spark_user"
    val conf = HBaseConfiguration.create()
    //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
    conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
    //设置zookeeper连接端口,默认2181
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set(TableInputFormat.INPUT_TABLE, tablename)

    // 如果表不存在则创建表
    val admin = new HBaseAdmin(conf)
    if (!admin.isTableAvailable(tablename)) {
      val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
      admin.createTable(tableDesc)
    }

    //读取数据并转化成rdd
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    val count = hBaseRDD.count()
    println(count)
    hBaseRDD.foreach{case (_,result) =>{
      //获取行键
      val key = Bytes.toString(result.getRow)
      //通过列族和列名获取列
      val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
      val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
      println("Row key:"+key+" Name:"+name+" Age:"+age)
    }}

    sc.stop()
    admin.close()
  }
}

ps:关于spark如何将数据保存到Hbase,可以参考http://rscala.com/index.php/397.html

rscala.com版权所有,本文spark 如何读取Hbase中存储的数据(scala代码实现)转载请注明出处:http://rscala.com/index.php/395.html

该文章归档分类于 scala实践, spark实践, 大数据基础

Leave a Reply

电子邮件地址不会被公开。 必填项已用*标注

*

code