[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/carbondata/pull/2971


---


[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-04 Thread QiangCai
Github user QiangCai commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r245260405
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ---
@@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark {
   Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
 }
   }
+
+  /**
+   * 1. range partition the whole input data
+   * 2. for each range, sort the data and writ it to CarbonData files
+   */
+  def loadDataUsingRangeSort(
--- End diff --

yes,  we can reuse the conversion step and the final status update part.
but I find it will not easy to read the code flow.
so I try to reuse the final status update part.


---


[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-04 Thread QiangCai
Github user QiangCai commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r245237625
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ---
@@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark {
   Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
 }
   }
+
+  /**
+   * 1. range partition the whole input data
+   * 2. for each range, sort the data and writ it to CarbonData files
+   */
+  def loadDataUsingRangeSort(
+  sparkSession: SparkSession,
+  model: CarbonLoadModel,
+  hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
+// initialize and prepare row counter
+val sc = sparkSession.sparkContext
+val modelBroadcast = sc.broadcast(model)
+val partialSuccessAccum = sc.accumulator(0, "Partial Success 
Accumulator")
+val inputStepRowCounter = sc.accumulator(0, "Input Processor 
Accumulator")
+val convertStepRowCounter = sc.accumulator(0, "Convert Processor 
Accumulator")
+val sortStepRowCounter = sc.accumulator(0, "Sort Processor 
Accumulator")
+val writeStepRowCounter = sc.accumulator(0, "Write Processor 
Accumulator")
+
+// 1. Input
+hadoopConf
+  .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, 
sparkSession.sparkContext.appName)
+val inputRDD = CsvRDDHelper
+  .csvFileScanRDD(sparkSession, model, hadoopConf)
+  .mapPartitionsWithIndex { case (index, rows) =>
+DataLoadProcessorStepOnSpark
+  .internalInputFunc(rows, index, modelBroadcast, 
inputStepRowCounter)
+  }
+
+// 2. Convert
+val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
+val convertRDD = inputRDD
+  .mapPartitionsWithIndex { case (index, rows) =>
+
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+DataLoadProcessorStepOnSpark
+  .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, 
convertStepRowCounter)
+  }
+  .filter(_ != null)
+
+// 3. Range partition by range_column
+val configuration = DataLoadProcessBuilder.createConfiguration(model)
+val rangeColumnIndex =
+  indexOfColumn(model.getRangePartitionColumn, 
configuration.getDataFields)
+// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)]
+val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex))
+// range partition by key
+val numPartitions = getNumPartitions(configuration, model, convertRDD)
+val objectOrdering: Ordering[Object] = 
createOrderingForColumn(model.getRangePartitionColumn)
+import scala.reflect.classTag
+val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, 
configuration, modelBroadcast)
+val rangeRDD = keyRDD
+  .partitionBy(
+new DataSkewRangePartitioner(numPartitions, 
sampleRDD)(objectOrdering, classTag[Object]))
+  .map(_._2)
+
+// 4. Sort and Write data
+sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) 
=>
+  DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, 
context.partitionId, modelBroadcast,
+writeStepRowCounter, conf.value.value))
+
+// Log the number of rows in each step
+LOGGER.info("Total rows processed in step Input Processor: " + 
inputStepRowCounter.value)
+LOGGER.info("Total rows processed in step Data Converter: " + 
convertStepRowCounter.value)
+LOGGER.info("Total rows processed in step Sort Processor: " + 
sortStepRowCounter.value)
+LOGGER.info("Total rows processed in step Data Writer: " + 
writeStepRowCounter.value)
+
+// Update status
+if (partialSuccessAccum.value != 0) {
+  val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE +
+   "Partial_Success"
+  val loadMetadataDetails = new LoadMetadataDetails()
+  
loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
+  val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+  executionErrors.failureCauses = FailureCauses.BAD_RECORDS
+  Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+} else {
+  val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE + "Success"
+  val loadMetadataDetails = new LoadMetadataDetails()
+  loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
+  val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+  Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+}
+  }
+
+  /**
+   * provide RDD 

[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-03 Thread QiangCai
Github user QiangCai commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r245224484
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ---
@@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark {
   Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
 }
   }
+
+  /**
+   * 1. range partition the whole input data
+   * 2. for each range, sort the data and writ it to CarbonData files
+   */
+  def loadDataUsingRangeSort(
+  sparkSession: SparkSession,
+  model: CarbonLoadModel,
+  hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
+// initialize and prepare row counter
+val sc = sparkSession.sparkContext
+val modelBroadcast = sc.broadcast(model)
+val partialSuccessAccum = sc.accumulator(0, "Partial Success 
Accumulator")
+val inputStepRowCounter = sc.accumulator(0, "Input Processor 
Accumulator")
+val convertStepRowCounter = sc.accumulator(0, "Convert Processor 
Accumulator")
+val sortStepRowCounter = sc.accumulator(0, "Sort Processor 
Accumulator")
+val writeStepRowCounter = sc.accumulator(0, "Write Processor 
Accumulator")
+
+// 1. Input
+hadoopConf
+  .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, 
sparkSession.sparkContext.appName)
+val inputRDD = CsvRDDHelper
+  .csvFileScanRDD(sparkSession, model, hadoopConf)
+  .mapPartitionsWithIndex { case (index, rows) =>
+DataLoadProcessorStepOnSpark
+  .internalInputFunc(rows, index, modelBroadcast, 
inputStepRowCounter)
+  }
+
+// 2. Convert
+val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
+val convertRDD = inputRDD
+  .mapPartitionsWithIndex { case (index, rows) =>
+
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+DataLoadProcessorStepOnSpark
+  .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, 
convertStepRowCounter)
+  }
+  .filter(_ != null)
+
+// 3. Range partition by range_column
+val configuration = DataLoadProcessBuilder.createConfiguration(model)
+val rangeColumnIndex =
+  indexOfColumn(model.getRangePartitionColumn, 
configuration.getDataFields)
+// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)]
+val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex))
+// range partition by key
+val numPartitions = getNumPartitions(configuration, model, convertRDD)
+val objectOrdering: Ordering[Object] = 
createOrderingForColumn(model.getRangePartitionColumn)
+import scala.reflect.classTag
+val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, 
configuration, modelBroadcast)
+val rangeRDD = keyRDD
+  .partitionBy(
+new DataSkewRangePartitioner(numPartitions, 
sampleRDD)(objectOrdering, classTag[Object]))
+  .map(_._2)
+
+// 4. Sort and Write data
+sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) 
=>
+  DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, 
context.partitionId, modelBroadcast,
+writeStepRowCounter, conf.value.value))
+
+// Log the number of rows in each step
+LOGGER.info("Total rows processed in step Input Processor: " + 
inputStepRowCounter.value)
+LOGGER.info("Total rows processed in step Data Converter: " + 
convertStepRowCounter.value)
+LOGGER.info("Total rows processed in step Sort Processor: " + 
sortStepRowCounter.value)
+LOGGER.info("Total rows processed in step Data Writer: " + 
writeStepRowCounter.value)
+
+// Update status
+if (partialSuccessAccum.value != 0) {
+  val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE +
+   "Partial_Success"
+  val loadMetadataDetails = new LoadMetadataDetails()
+  
loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
+  val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+  executionErrors.failureCauses = FailureCauses.BAD_RECORDS
+  Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+} else {
+  val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE + "Success"
+  val loadMetadataDetails = new LoadMetadataDetails()
+  loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
+  val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+  Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+}
+  }
+
+  /**
+   * provide RDD 

[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-03 Thread QiangCai
Github user QiangCai commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r245224217
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ---
@@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark {
   Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
 }
   }
+
+  /**
+   * 1. range partition the whole input data
+   * 2. for each range, sort the data and writ it to CarbonData files
+   */
+  def loadDataUsingRangeSort(
+  sparkSession: SparkSession,
+  model: CarbonLoadModel,
+  hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
+// initialize and prepare row counter
+val sc = sparkSession.sparkContext
+val modelBroadcast = sc.broadcast(model)
+val partialSuccessAccum = sc.accumulator(0, "Partial Success 
Accumulator")
+val inputStepRowCounter = sc.accumulator(0, "Input Processor 
Accumulator")
+val convertStepRowCounter = sc.accumulator(0, "Convert Processor 
Accumulator")
+val sortStepRowCounter = sc.accumulator(0, "Sort Processor 
Accumulator")
+val writeStepRowCounter = sc.accumulator(0, "Write Processor 
Accumulator")
+
+// 1. Input
+hadoopConf
+  .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, 
sparkSession.sparkContext.appName)
+val inputRDD = CsvRDDHelper
+  .csvFileScanRDD(sparkSession, model, hadoopConf)
+  .mapPartitionsWithIndex { case (index, rows) =>
+DataLoadProcessorStepOnSpark
+  .internalInputFunc(rows, index, modelBroadcast, 
inputStepRowCounter)
+  }
+
+// 2. Convert
+val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
+val convertRDD = inputRDD
+  .mapPartitionsWithIndex { case (index, rows) =>
+
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+DataLoadProcessorStepOnSpark
+  .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, 
convertStepRowCounter)
+  }
+  .filter(_ != null)
+
+// 3. Range partition by range_column
+val configuration = DataLoadProcessBuilder.createConfiguration(model)
+val rangeColumnIndex =
+  indexOfColumn(model.getRangePartitionColumn, 
configuration.getDataFields)
+// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)]
+val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex))
+// range partition by key
+val numPartitions = getNumPartitions(configuration, model, convertRDD)
+val objectOrdering: Ordering[Object] = 
createOrderingForColumn(model.getRangePartitionColumn)
+import scala.reflect.classTag
+val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, 
configuration, modelBroadcast)
+val rangeRDD = keyRDD
+  .partitionBy(
+new DataSkewRangePartitioner(numPartitions, 
sampleRDD)(objectOrdering, classTag[Object]))
+  .map(_._2)
+
+// 4. Sort and Write data
+sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) 
=>
+  DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, 
context.partitionId, modelBroadcast,
+writeStepRowCounter, conf.value.value))
+
+// Log the number of rows in each step
+LOGGER.info("Total rows processed in step Input Processor: " + 
inputStepRowCounter.value)
+LOGGER.info("Total rows processed in step Data Converter: " + 
convertStepRowCounter.value)
+LOGGER.info("Total rows processed in step Sort Processor: " + 
sortStepRowCounter.value)
+LOGGER.info("Total rows processed in step Data Writer: " + 
writeStepRowCounter.value)
+
+// Update status
+if (partialSuccessAccum.value != 0) {
+  val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE +
+   "Partial_Success"
+  val loadMetadataDetails = new LoadMetadataDetails()
+  
loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
+  val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+  executionErrors.failureCauses = FailureCauses.BAD_RECORDS
+  Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+} else {
+  val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE + "Success"
+  val loadMetadataDetails = new LoadMetadataDetails()
+  loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
+  val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+  Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+}
+  }
+
+  /**
+   * provide RDD 

[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-03 Thread QiangCai
Github user QiangCai commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r245223174
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 ---
@@ -95,6 +96,67 @@ object DataLoadProcessorStepOnSpark {
 }
   }
 
+  def internalInputFunc(
+  rows: Iterator[InternalRow],
+  index: Int,
+  modelBroadcast: Broadcast[CarbonLoadModel],
+  rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
+val model: CarbonLoadModel = 
modelBroadcast.value.getCopyWithTaskNo(index.toString)
+val conf = DataLoadProcessBuilder.createConfiguration(model)
+val rowParser = new RowParserImpl(conf.getDataFields, conf)
+val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
+TaskContext.get().addTaskFailureListener { (t: TaskContext, e: 
Throwable) =>
+  wrapException(e, model)
+}
+
+new Iterator[CarbonRow] {
+  override def hasNext: Boolean = rows.hasNext
+
+  override def next(): CarbonRow = {
+var row : CarbonRow = null
+val rawRow =
+  
rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]]
+if(isRawDataRequired) {
+  row = new CarbonRow(rowParser.parseRow(rawRow), rawRow)
+} else {
+  row = new CarbonRow(rowParser.parseRow(rawRow))
+}
+rowCounter.add(1)
+row
+  }
+}
+  }
+
+  def internalSampleInputFunc(
--- End diff --

ok


---


[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-03 Thread QiangCai
Github user QiangCai commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r24542
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 ---
@@ -95,6 +96,67 @@ object DataLoadProcessorStepOnSpark {
 }
   }
 
+  def internalInputFunc(
+  rows: Iterator[InternalRow],
+  index: Int,
+  modelBroadcast: Broadcast[CarbonLoadModel],
+  rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
+val model: CarbonLoadModel = 
modelBroadcast.value.getCopyWithTaskNo(index.toString)
+val conf = DataLoadProcessBuilder.createConfiguration(model)
+val rowParser = new RowParserImpl(conf.getDataFields, conf)
+val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
+TaskContext.get().addTaskFailureListener { (t: TaskContext, e: 
Throwable) =>
+  wrapException(e, model)
+}
+
+new Iterator[CarbonRow] {
+  override def hasNext: Boolean = rows.hasNext
+
+  override def next(): CarbonRow = {
+var row : CarbonRow = null
+val rawRow =
+  
rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]]
+if(isRawDataRequired) {
+  row = new CarbonRow(rowParser.parseRow(rawRow), rawRow)
+} else {
+  row = new CarbonRow(rowParser.parseRow(rawRow))
+}
+rowCounter.add(1)
+row
+  }
+}
+  }
+
+  def internalSampleInputFunc(
--- End diff --

the parameter "rows" of them is  a different type.


---


[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-03 Thread QiangCai
Github user QiangCai commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r245198317
  
--- Diff: 
integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
 ---
@@ -106,6 +106,24 @@ class TestGlobalSortDataLoad extends QueryTest with 
BeforeAndAfterEach with Befo
   sql("SELECT * FROM carbon_localsort_once ORDER BY name"))
   }
 
+  test("Make sure the result is right and sorted in global level for 
range_sort") {
--- End diff --

ok


---


[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-03 Thread QiangCai
Github user QiangCai commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r245198367
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+import scala.util.hashing.byteswap32
+
+import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.{CollectionsUtils, Utils}
+
+/**
+ * support data skew scenario
+ * copy from spark: RangePartiitoner
+ */
+class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
--- End diff --

ok


---


[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-02 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r244681963
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+import scala.util.hashing.byteswap32
+
+import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.{CollectionsUtils, Utils}
+
+/**
+ * support data skew scenario
+ * copy from spark: RangePartiitoner
+ */
+class DataSkewRangePartitioner[K: Ordering : ClassTag, V](
--- End diff --

Please add the comment how it is different from spark's range partitioner 
and how you do skew partitioning


---


[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-02 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r244681720
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ---
@@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark {
   Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
 }
   }
+
+  /**
+   * 1. range partition the whole input data
+   * 2. for each range, sort the data and writ it to CarbonData files
+   */
+  def loadDataUsingRangeSort(
+  sparkSession: SparkSession,
+  model: CarbonLoadModel,
+  hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
+// initialize and prepare row counter
+val sc = sparkSession.sparkContext
+val modelBroadcast = sc.broadcast(model)
+val partialSuccessAccum = sc.accumulator(0, "Partial Success 
Accumulator")
+val inputStepRowCounter = sc.accumulator(0, "Input Processor 
Accumulator")
+val convertStepRowCounter = sc.accumulator(0, "Convert Processor 
Accumulator")
+val sortStepRowCounter = sc.accumulator(0, "Sort Processor 
Accumulator")
+val writeStepRowCounter = sc.accumulator(0, "Write Processor 
Accumulator")
+
+// 1. Input
+hadoopConf
+  .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, 
sparkSession.sparkContext.appName)
+val inputRDD = CsvRDDHelper
+  .csvFileScanRDD(sparkSession, model, hadoopConf)
+  .mapPartitionsWithIndex { case (index, rows) =>
+DataLoadProcessorStepOnSpark
+  .internalInputFunc(rows, index, modelBroadcast, 
inputStepRowCounter)
+  }
+
+// 2. Convert
+val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
+val convertRDD = inputRDD
+  .mapPartitionsWithIndex { case (index, rows) =>
+
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+DataLoadProcessorStepOnSpark
+  .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, 
convertStepRowCounter)
+  }
+  .filter(_ != null)
+
+// 3. Range partition by range_column
+val configuration = DataLoadProcessBuilder.createConfiguration(model)
+val rangeColumnIndex =
+  indexOfColumn(model.getRangePartitionColumn, 
configuration.getDataFields)
+// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)]
+val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex))
+// range partition by key
+val numPartitions = getNumPartitions(configuration, model, convertRDD)
+val objectOrdering: Ordering[Object] = 
createOrderingForColumn(model.getRangePartitionColumn)
+import scala.reflect.classTag
+val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, 
configuration, modelBroadcast)
+val rangeRDD = keyRDD
+  .partitionBy(
+new DataSkewRangePartitioner(numPartitions, 
sampleRDD)(objectOrdering, classTag[Object]))
+  .map(_._2)
+
+// 4. Sort and Write data
+sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) 
=>
+  DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, 
context.partitionId, modelBroadcast,
+writeStepRowCounter, conf.value.value))
+
+// Log the number of rows in each step
+LOGGER.info("Total rows processed in step Input Processor: " + 
inputStepRowCounter.value)
+LOGGER.info("Total rows processed in step Data Converter: " + 
convertStepRowCounter.value)
+LOGGER.info("Total rows processed in step Sort Processor: " + 
sortStepRowCounter.value)
+LOGGER.info("Total rows processed in step Data Writer: " + 
writeStepRowCounter.value)
+
+// Update status
+if (partialSuccessAccum.value != 0) {
+  val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE +
+   "Partial_Success"
+  val loadMetadataDetails = new LoadMetadataDetails()
+  
loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
+  val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+  executionErrors.failureCauses = FailureCauses.BAD_RECORDS
+  Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+} else {
+  val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE + "Success"
+  val loadMetadataDetails = new LoadMetadataDetails()
+  loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
+  val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+  Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+}
+  }
+
+  /**
+   * provide 

[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-02 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r244681813
  
--- Diff: 
integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
 ---
@@ -106,6 +106,24 @@ class TestGlobalSortDataLoad extends QueryTest with 
BeforeAndAfterEach with Befo
   sql("SELECT * FROM carbon_localsort_once ORDER BY name"))
   }
 
+  test("Make sure the result is right and sorted in global level for 
range_sort") {
--- End diff --

Please add some test cases with scalefactor as well.


---


[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-02 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r244681164
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ---
@@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark {
   Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
 }
   }
+
+  /**
+   * 1. range partition the whole input data
+   * 2. for each range, sort the data and writ it to CarbonData files
+   */
+  def loadDataUsingRangeSort(
+  sparkSession: SparkSession,
+  model: CarbonLoadModel,
+  hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
+// initialize and prepare row counter
+val sc = sparkSession.sparkContext
+val modelBroadcast = sc.broadcast(model)
+val partialSuccessAccum = sc.accumulator(0, "Partial Success 
Accumulator")
+val inputStepRowCounter = sc.accumulator(0, "Input Processor 
Accumulator")
+val convertStepRowCounter = sc.accumulator(0, "Convert Processor 
Accumulator")
+val sortStepRowCounter = sc.accumulator(0, "Sort Processor 
Accumulator")
+val writeStepRowCounter = sc.accumulator(0, "Write Processor 
Accumulator")
+
+// 1. Input
+hadoopConf
+  .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, 
sparkSession.sparkContext.appName)
+val inputRDD = CsvRDDHelper
+  .csvFileScanRDD(sparkSession, model, hadoopConf)
+  .mapPartitionsWithIndex { case (index, rows) =>
+DataLoadProcessorStepOnSpark
+  .internalInputFunc(rows, index, modelBroadcast, 
inputStepRowCounter)
+  }
+
+// 2. Convert
+val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
+val convertRDD = inputRDD
+  .mapPartitionsWithIndex { case (index, rows) =>
+
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+DataLoadProcessorStepOnSpark
+  .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, 
convertStepRowCounter)
+  }
+  .filter(_ != null)
+
+// 3. Range partition by range_column
+val configuration = DataLoadProcessBuilder.createConfiguration(model)
+val rangeColumnIndex =
+  indexOfColumn(model.getRangePartitionColumn, 
configuration.getDataFields)
+// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)]
+val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex))
+// range partition by key
+val numPartitions = getNumPartitions(configuration, model, convertRDD)
+val objectOrdering: Ordering[Object] = 
createOrderingForColumn(model.getRangePartitionColumn)
+import scala.reflect.classTag
+val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, 
configuration, modelBroadcast)
+val rangeRDD = keyRDD
+  .partitionBy(
+new DataSkewRangePartitioner(numPartitions, 
sampleRDD)(objectOrdering, classTag[Object]))
+  .map(_._2)
+
+// 4. Sort and Write data
+sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) 
=>
+  DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, 
context.partitionId, modelBroadcast,
+writeStepRowCounter, conf.value.value))
+
+// Log the number of rows in each step
+LOGGER.info("Total rows processed in step Input Processor: " + 
inputStepRowCounter.value)
+LOGGER.info("Total rows processed in step Data Converter: " + 
convertStepRowCounter.value)
+LOGGER.info("Total rows processed in step Sort Processor: " + 
sortStepRowCounter.value)
+LOGGER.info("Total rows processed in step Data Writer: " + 
writeStepRowCounter.value)
+
+// Update status
+if (partialSuccessAccum.value != 0) {
+  val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE +
+   "Partial_Success"
+  val loadMetadataDetails = new LoadMetadataDetails()
+  
loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
+  val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+  executionErrors.failureCauses = FailureCauses.BAD_RECORDS
+  Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+} else {
+  val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE + "Success"
+  val loadMetadataDetails = new LoadMetadataDetails()
+  loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
+  val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+  Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+}
+  }
+
+  /**
+   * provide 

[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-02 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r244681062
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 ---
@@ -95,6 +96,67 @@ object DataLoadProcessorStepOnSpark {
 }
   }
 
+  def internalInputFunc(
+  rows: Iterator[InternalRow],
+  index: Int,
+  modelBroadcast: Broadcast[CarbonLoadModel],
+  rowCounter: Accumulator[Int]): Iterator[CarbonRow] = {
+val model: CarbonLoadModel = 
modelBroadcast.value.getCopyWithTaskNo(index.toString)
+val conf = DataLoadProcessBuilder.createConfiguration(model)
+val rowParser = new RowParserImpl(conf.getDataFields, conf)
+val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
+TaskContext.get().addTaskFailureListener { (t: TaskContext, e: 
Throwable) =>
+  wrapException(e, model)
+}
+
+new Iterator[CarbonRow] {
+  override def hasNext: Boolean = rows.hasNext
+
+  override def next(): CarbonRow = {
+var row : CarbonRow = null
+val rawRow =
+  
rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]]
+if(isRawDataRequired) {
+  row = new CarbonRow(rowParser.parseRow(rawRow), rawRow)
+} else {
+  row = new CarbonRow(rowParser.parseRow(rawRow))
+}
+rowCounter.add(1)
+row
+  }
+}
+  }
+
+  def internalSampleInputFunc(
--- End diff --

Please unify `internalSampleInputFunc` and `internalInputFunc`


---


[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-02 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r244680876
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ---
@@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark {
   Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
 }
   }
+
+  /**
+   * 1. range partition the whole input data
+   * 2. for each range, sort the data and writ it to CarbonData files
+   */
+  def loadDataUsingRangeSort(
+  sparkSession: SparkSession,
+  model: CarbonLoadModel,
+  hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
+// initialize and prepare row counter
+val sc = sparkSession.sparkContext
+val modelBroadcast = sc.broadcast(model)
+val partialSuccessAccum = sc.accumulator(0, "Partial Success 
Accumulator")
+val inputStepRowCounter = sc.accumulator(0, "Input Processor 
Accumulator")
+val convertStepRowCounter = sc.accumulator(0, "Convert Processor 
Accumulator")
+val sortStepRowCounter = sc.accumulator(0, "Sort Processor 
Accumulator")
+val writeStepRowCounter = sc.accumulator(0, "Write Processor 
Accumulator")
+
+// 1. Input
+hadoopConf
+  .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, 
sparkSession.sparkContext.appName)
+val inputRDD = CsvRDDHelper
+  .csvFileScanRDD(sparkSession, model, hadoopConf)
+  .mapPartitionsWithIndex { case (index, rows) =>
+DataLoadProcessorStepOnSpark
+  .internalInputFunc(rows, index, modelBroadcast, 
inputStepRowCounter)
+  }
+
+// 2. Convert
+val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
+val convertRDD = inputRDD
+  .mapPartitionsWithIndex { case (index, rows) =>
+
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+DataLoadProcessorStepOnSpark
+  .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, 
convertStepRowCounter)
+  }
+  .filter(_ != null)
+
+// 3. Range partition by range_column
+val configuration = DataLoadProcessBuilder.createConfiguration(model)
+val rangeColumnIndex =
+  indexOfColumn(model.getRangePartitionColumn, 
configuration.getDataFields)
+// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)]
+val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex))
+// range partition by key
+val numPartitions = getNumPartitions(configuration, model, convertRDD)
+val objectOrdering: Ordering[Object] = 
createOrderingForColumn(model.getRangePartitionColumn)
+import scala.reflect.classTag
+val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, 
configuration, modelBroadcast)
+val rangeRDD = keyRDD
+  .partitionBy(
+new DataSkewRangePartitioner(numPartitions, 
sampleRDD)(objectOrdering, classTag[Object]))
+  .map(_._2)
+
+// 4. Sort and Write data
+sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) 
=>
+  DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, 
context.partitionId, modelBroadcast,
+writeStepRowCounter, conf.value.value))
+
+// Log the number of rows in each step
+LOGGER.info("Total rows processed in step Input Processor: " + 
inputStepRowCounter.value)
+LOGGER.info("Total rows processed in step Data Converter: " + 
convertStepRowCounter.value)
+LOGGER.info("Total rows processed in step Sort Processor: " + 
sortStepRowCounter.value)
+LOGGER.info("Total rows processed in step Data Writer: " + 
writeStepRowCounter.value)
+
+// Update status
+if (partialSuccessAccum.value != 0) {
+  val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE +
+   "Partial_Success"
+  val loadMetadataDetails = new LoadMetadataDetails()
+  
loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
+  val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+  executionErrors.failureCauses = FailureCauses.BAD_RECORDS
+  Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+} else {
+  val uniqueLoadStatusId = model.getTableName + 
CarbonCommonConstants.UNDERSCORE + "Success"
+  val loadMetadataDetails = new LoadMetadataDetails()
+  loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
+  val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+  Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
+}
+  }
+
+  /**
+   * provide 

[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...

2019-01-01 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2971#discussion_r244670461
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ---
@@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark {
   Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
 }
   }
+
+  /**
+   * 1. range partition the whole input data
+   * 2. for each range, sort the data and writ it to CarbonData files
+   */
+  def loadDataUsingRangeSort(
--- End diff --

Code is duplicated with method `loadDataUsingGlobalSort` , please try to 
extract common code and reuse here.


---