Hi All,

In one of my project we thought of using HBASE as back end.

My use case is I have 1TB of data which will come as multiple files (one
file around 40GB with 100 Million rows & contains 102 columns for each row)
I am trying to load this files using spark + Phoenix it is taking around 2
hours.

can you please suggest how to fine tune the load process and how to load
back the data using spark.

Environment details
==========================
Hadoop Distribution : Hortonworks
Spark Version : 1.6
Hbase Version: 1.1.2
Phoenix Version: 4.4.0
Number of nodes: 19

Please find the attachment for the create and load scripts.


Thanks & Regards
   Radha krishna
Phoenix create table with one column family and 19 salt buckets 
===============================================================

CREATE TABLE IF NOT EXISTS MY_Table_Name(
"BASE_PROD_ID" VARCHAR,
"SRL_NR_ID" VARCHAR,
"CLF_1"."PROD_ID" VARCHAR,
..... ( 102 columns )
CONSTRAINT my_pk PRIMARY KEY (BASE_PROD_ID, SRL_NR_ID))SALT_BUCKETS=19, 
COMPRESSION='GZ';


Spark Code
==========

object InsertRecords {

  def main(args: Array[String]): Unit = {

    try {

      val sparkConf = new SparkConf().setAppName("Phoenix_HbaseTest")
      val sparkContext = new SparkContext(sparkConf)
      val sqlContext = new SQLContext(sparkContext)
           
      val schemaString = "BASE_PROD_ID SRL_NR_ID PROD_ID ..."

      // Generate the schema based on the string of schemaString
      val schema = StructType(schemaString.split(" ").map(fieldName => 
StructField(fieldName, StringType, true)))
      
      // Convert records of the RDD (people) to Rows.
      val input_rdd = 
sparkContext.textFile(args(0)).map(_.split("\u001c")).map(p => 
Row(p(0),p(1).trim().toUpperCase(),p(2).trim().toUpperCase(),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12),p(13),p(14),p(15),p(16),p(17),p(18),p(19),p(20),p(21),p(22),p(23),p(24),p(25),p(26),p(27),p(28),p(29),p(30),p(31),p(32),p(33),p(34),p(35),p(36),p(37),p(38),p(39),p(40),p(41),p(42),p(43),p(44),p(45),p(46),p(47),p(48),p(49),p(50),p(51),p(52),p(53),p(54),p(55),p(56),p(57),p(58),p(59),p(60),p(61),p(62),p(63),p(64),p(65),p(66),p(67),p(68),p(69),p(70),p(71),p(72),p(73),p(74),p(75),p(76),p(77),p(78),p(79),p(80),p(81),p(82),p(83),p(84),p(85),p(86),p(87),p(88),p(89),p(90),p(91),p(92),p(93),p(94),p(95),p(96),p(97),p(98),p(99),p(100),p(101),p(102)))
      
      // Apply the schema to the RDD.
      val inputDF = sqlContext.createDataFrame(input_rdd, schema)   
      
      
inputDF.write.format("org.apache.phoenix.spark").mode(args(1)).options(Map("table"
 -> args(2), "zkUrl" -> args(3),"-batchSize" -> args(4))).save()

      sparkContext.stop()
    } catch {
      case t: Throwable => t.printStackTrace()
      case e: Exception => e.printStackTrace()
    }

  }
}

Reply via email to