Hi All, In one of my project we thought of using HBASE as back end.
My use case is I have 1TB of data which will come as multiple files (one file around 40GB with 100 Million rows & contains 102 columns for each row) I am trying to load this files using spark + Phoenix it is taking around 2 hours. can you please suggest how to fine tune the load process and how to load back the data using spark. Environment details ========================== Hadoop Distribution : Hortonworks Spark Version : 1.6 Hbase Version: 1.1.2 Phoenix Version: 4.4.0 Number of nodes: 19 Please find the attachment for the create and load scripts. Thanks & Regards Radha krishna
Phoenix create table with one column family and 19 salt buckets =============================================================== CREATE TABLE IF NOT EXISTS MY_Table_Name( "BASE_PROD_ID" VARCHAR, "SRL_NR_ID" VARCHAR, "CLF_1"."PROD_ID" VARCHAR, ..... ( 102 columns ) CONSTRAINT my_pk PRIMARY KEY (BASE_PROD_ID, SRL_NR_ID))SALT_BUCKETS=19, COMPRESSION='GZ'; Spark Code ========== object InsertRecords { def main(args: Array[String]): Unit = { try { val sparkConf = new SparkConf().setAppName("Phoenix_HbaseTest") val sparkContext = new SparkContext(sparkConf) val sqlContext = new SQLContext(sparkContext) val schemaString = "BASE_PROD_ID SRL_NR_ID PROD_ID ..." // Generate the schema based on the string of schemaString val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // Convert records of the RDD (people) to Rows. val input_rdd = sparkContext.textFile(args(0)).map(_.split("\u001c")).map(p => Row(p(0),p(1).trim().toUpperCase(),p(2).trim().toUpperCase(),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12),p(13),p(14),p(15),p(16),p(17),p(18),p(19),p(20),p(21),p(22),p(23),p(24),p(25),p(26),p(27),p(28),p(29),p(30),p(31),p(32),p(33),p(34),p(35),p(36),p(37),p(38),p(39),p(40),p(41),p(42),p(43),p(44),p(45),p(46),p(47),p(48),p(49),p(50),p(51),p(52),p(53),p(54),p(55),p(56),p(57),p(58),p(59),p(60),p(61),p(62),p(63),p(64),p(65),p(66),p(67),p(68),p(69),p(70),p(71),p(72),p(73),p(74),p(75),p(76),p(77),p(78),p(79),p(80),p(81),p(82),p(83),p(84),p(85),p(86),p(87),p(88),p(89),p(90),p(91),p(92),p(93),p(94),p(95),p(96),p(97),p(98),p(99),p(100),p(101),p(102))) // Apply the schema to the RDD. val inputDF = sqlContext.createDataFrame(input_rdd, schema) inputDF.write.format("org.apache.phoenix.spark").mode(args(1)).options(Map("table" -> args(2), "zkUrl" -> args(3),"-batchSize" -> args(4))).save() sparkContext.stop() } catch { case t: Throwable => t.printStackTrace() case e: Exception => e.printStackTrace() } } }