Wednesday, January 28, 2015

How to use Scala on Spark to load data into Hbase/MapRDB -- normal load or bulk load.

This article shows a sample code to load data into Hbase or MapRDB(M7) using Scala on Spark.
I will introduce 2 ways, one is normal load using Put , and another way is to use Bulk Load API.

1. Normal Load using org.apache.hadoop.hbase.client.Put(For Hbase and MapRDB)

This way is to use Put object to load data one by one. It is not so efficient as bulk loading.
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;

val conf = HBaseConfiguration.create()
val tableName = "/t1"
conf.set(TableInputFormat.INPUT_TABLE, tableName)

val myTable = new HTable(conf, tableName);
var p = new Put();
p = new Put(new String("row999").getBytes());
p.add("cf".getBytes(), "column_name".getBytes(), new String("value999").getBytes());
myTable.put(p);
myTable.flushCommits();

2. Bulk Load using Hfiles(For Hbase only).

This way has 2 steps, 1st step is to generate Hfiles and then use org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles to load the Hfiles in Hbase.
This only works for Hbase tables, not for MapRDB tables because is does not support bulk loading using Hfiles.
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles

val conf = HBaseConfiguration.create()
val tableName = "hao"
val table = new HTable(conf, tableName) 

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)

// Generate 10 sample data:
val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
    val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() )
    (new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})

// Save Hfiles on HDFS 
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)

//Bulk load Hfiles to Hbase
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/xxxx19"), table)
After that, 10 rows are inserted.
hbase(main):020:0> scan 'hao'
ROW                                                 COLUMN+CELL
 \x00\x00\x00\x01                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
 \x00\x00\x00\x02                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
 \x00\x00\x00\x03                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
 \x00\x00\x00\x04                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
 \x00\x00\x00\x05                                   column=cf:c1, timestamp=1425128075586, value=value_xxx
 \x00\x00\x00\x06                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
 \x00\x00\x00\x07                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
 \x00\x00\x00\x08                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
 \x00\x00\x00\x09                                   column=cf:c1, timestamp=1425128075675, value=value_xxx
 \x00\x00\x00\x0A                                   column=cf:c1, timestamp=1425128075675, value=value_xxx

3. Direct Bulk Load without Hfiles(For Hbase and MapRDB).

This way does not need to create Hfiles on HDFS and it will save to Hbase tables directly.
There is only a minor difference comparing to above examples:
Changes from :
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)  
To:
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())
Here is a complete example:

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles

val conf = HBaseConfiguration.create()
val tableName = "hao"
val table = new HTable(conf, tableName) 

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)

// Generate 10 sample data:
val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
    val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() )
    (new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})

// Directly bulk load to Hbase/MapRDB tables.
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())

Note: In above example, I am using "saveAsNewAPIHadoopFile" to save Hfiles on HDFS.
You can also use "saveAsNewAPIHadoopDataset" to achieve the same goal.
For example, just change below code from :
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration()) 
To:
job.getConfiguration.set("mapred.output.dir", "/tmp/xxxx19")
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)



21 comments:

  1. Hi , i have multiple column families and multiple qualifiers for each column families, can you please guide how to represent those in key value pairs. you have mentioned like below
    val rdd = num.map(x=>{
    val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() )
    (new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
    })

    but i am unable to change the above line to accumulate multiple column families and qualifiers please help me on this.

    ReplyDelete
    Replies
    1. You can create another rdd2 for example for column family 2.
      and then use "union"

      val totalrdd = rdd union rdd2

      See http://www.openkb.info/2015/01/scala-on-spark-cheatsheet.html for more details on each transformation and action.

      Delete
    2. Would you please suggest that how to construct KeyValue map for multiple column qualifiers?

      e.g. I want to insert to hbase like
      rowkey, column_family, column_qualifier, value
      John, cf, age, 29
      John cf, sex, male

      Delete
  2. Hi,
    I've been trying to do the same thing with rows that have multiple columns. I transform my RDD so that it is of type (ImmutableBytesWritable, KeyValue); however due to the fact that rows have multiple columns, some of the pairs in the RDD have the same row key, e.g.

    (rowKey1, [rowKey1, cf, col1, val1])
    (rowKey1, [rowKey1, cf, col2, val2])
    (rowKey1, [rowKey1, cf, col3, val3])

    When I run this code though, I get an exception that looks like this:

    Added a key not lexically larger than previous. Current cell = 155964:1/variant:155967/1461259975844/Put/vlen=577/seqid=0, lastCell = 155964:1/variant:coords/1461259975824/Put/vlen=29/seqid=0

    Is there any way around this?

    ReplyDelete
    Replies
    1. I saw the same error when using multiple columns. I tried .sortByKey(true), but that did not help and I am still seeing not lexically larger than previous.

      Delete
  3. NOT working. Even if entire RDD is Ordered including rowkeyy, cf, cq. ANd it should not because HFileOutputFormat clearly says every cells needs to be in order. How that will work with distributed RDD. just not possible. Unless someone claims its working for a truly distributed rdd I think this post is misleading.

    ReplyDelete
  4. Hi All, I have a task like I want to read xml data from hdfs and stored xml data into HBase suing spark and scala, please help me in this.

    ReplyDelete
  5. Hi All, I have a task like I want to read xml data from hdfs and stored xml data into HBase suing spark and scala, please help me in this.

    ReplyDelete
  6. Spark Scala code to load data into hbase

    //HIVE Connectivity
    import java.lang.String
    import org.apache.spark.sql.functions._
    import org.apache.spark._
    import org.apache.spark.SparkContext._

    //HBASE Connectivity
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
    import org.apache.hadoop.hbase.client.HBaseAdmin
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.HColumnDescriptor
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.HTable;

    //Set Hive Context
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    hiveContext.setConf("hive.exec.dynamic.partition", "true")
    hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

    //Load Data from Hive Table to DataFrame
    val df = hiveContext.sql("select t.* from eagle_abhijeet.abhi t order by t.id")

    //GroupBy Logic for Generic Columns
    val gencol_df = df.select("id", "sal")
    val agg_df = gencol_df.groupBy("id").agg(sum("sal"))

    //Complete GroupBy for Distribution
    val dist_df = df.select("id", "at", "dt")
    val agg_dist_df = dist_df.groupBy("id", "at").agg(sum("dt"))


    //Connect to HBASE
    val config = HBaseConfiguration.create()
    config.clear();
    config.set("hbase.zookeeper.quorum", "10.10.127.7");
    config.set("hbase.zookeeper.property.clientPort","2181");

    //Connect to Table
    val tableName = "aggTable"
    config.set(TableInputFormat.INPUT_TABLE, tableName)
    val myTable = new HTable(config, tableName);

    //Store values in HBASE

    //Iterate through the Main DF and then append Dist DF
    agg_df.collect().foreach(elem => {

    var rowKey = elem.getString(0);
    var sal = elem.getDouble(1).toString;

    var p = new Put(rowKey.getBytes()); //Store RowKey
    p.add("CF".getBytes(), "sal".getBytes(), sal.getBytes()); //Store Sal

    //Iterate through Distribution
    agg_dist_df.filter($"id" === rowKey).collect().foreach(innerElem => {

    var acctTypCD = innerElem.get(1).toString;
    var acctTypDistribution = innerElem.getDouble(2).toString;
    p.add("CF".getBytes(), "DistByAcctType:".concat(acctTypCD).getBytes(), acctTypDistribution.getBytes())
    })
    //Commit to DB
    myTable.put(p);
    myTable.flushCommits();
    })

    //Exit the Code
    exit

    ReplyDelete
    Replies
    1. can you share the POM.xml for your application?

      Delete
    2. There is no pom.xml. Above code was executed in spark shell. But since it is pretty old version, some of the API may change. So feel free to correct any code in latest release of Spark.

      Delete
    3. Does this work with multiple columns in a column family ?
      Can you share an example for the same.

      Delete

Popular Posts