[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user asfgit closed the pull request at: https://github.com/apache/carbondata/pull/2971 ---
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r245260405 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( --- End diff -- yes, we can reuse the conversion step and the final status update part. but I find it will not easy to read the code flow. so I try to reuse the final status update part. ---
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r245237625 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { +// initialize and prepare row counter +val sc = sparkSession.sparkContext +val modelBroadcast = sc.broadcast(model) +val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") +val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") +val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") +val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") +val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + +// 1. Input +hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) +val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, hadoopConf) + .mapPartitionsWithIndex { case (index, rows) => +DataLoadProcessorStepOnSpark + .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + +// 2. Convert +val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) +val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) +DataLoadProcessorStepOnSpark + .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) + } + .filter(_ != null) + +// 3. Range partition by range_column +val configuration = DataLoadProcessBuilder.createConfiguration(model) +val rangeColumnIndex = + indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields) +// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)] +val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex)) +// range partition by key +val numPartitions = getNumPartitions(configuration, model, convertRDD) +val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) +import scala.reflect.classTag +val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast) +val rangeRDD = keyRDD + .partitionBy( +new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object])) + .map(_._2) + +// 4. Sort and Write data +sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, +writeStepRowCounter, conf.value.value)) + +// Log the number of rows in each step +LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value) +LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + +// Update status +if (partialSuccessAccum.value != 0) { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + + "Partial_Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + executionErrors.failureCauses = FailureCauses.BAD_RECORDS + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} else { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} + } + + /** + * provide RDD
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r245224484 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { +// initialize and prepare row counter +val sc = sparkSession.sparkContext +val modelBroadcast = sc.broadcast(model) +val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") +val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") +val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") +val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") +val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + +// 1. Input +hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) +val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, hadoopConf) + .mapPartitionsWithIndex { case (index, rows) => +DataLoadProcessorStepOnSpark + .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + +// 2. Convert +val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) +val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) +DataLoadProcessorStepOnSpark + .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) + } + .filter(_ != null) + +// 3. Range partition by range_column +val configuration = DataLoadProcessBuilder.createConfiguration(model) +val rangeColumnIndex = + indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields) +// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)] +val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex)) +// range partition by key +val numPartitions = getNumPartitions(configuration, model, convertRDD) +val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) +import scala.reflect.classTag +val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast) +val rangeRDD = keyRDD + .partitionBy( +new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object])) + .map(_._2) + +// 4. Sort and Write data +sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, +writeStepRowCounter, conf.value.value)) + +// Log the number of rows in each step +LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value) +LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + +// Update status +if (partialSuccessAccum.value != 0) { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + + "Partial_Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + executionErrors.failureCauses = FailureCauses.BAD_RECORDS + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} else { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} + } + + /** + * provide RDD
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r245224217 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { +// initialize and prepare row counter +val sc = sparkSession.sparkContext +val modelBroadcast = sc.broadcast(model) +val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") +val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") +val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") +val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") +val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + +// 1. Input +hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) +val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, hadoopConf) + .mapPartitionsWithIndex { case (index, rows) => +DataLoadProcessorStepOnSpark + .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + +// 2. Convert +val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) +val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) +DataLoadProcessorStepOnSpark + .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) + } + .filter(_ != null) + +// 3. Range partition by range_column +val configuration = DataLoadProcessBuilder.createConfiguration(model) +val rangeColumnIndex = + indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields) +// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)] +val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex)) +// range partition by key +val numPartitions = getNumPartitions(configuration, model, convertRDD) +val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) +import scala.reflect.classTag +val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast) +val rangeRDD = keyRDD + .partitionBy( +new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object])) + .map(_._2) + +// 4. Sort and Write data +sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, +writeStepRowCounter, conf.value.value)) + +// Log the number of rows in each step +LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value) +LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + +// Update status +if (partialSuccessAccum.value != 0) { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + + "Partial_Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + executionErrors.failureCauses = FailureCauses.BAD_RECORDS + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} else { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} + } + + /** + * provide RDD
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r245223174 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala --- @@ -95,6 +96,67 @@ object DataLoadProcessorStepOnSpark { } } + def internalInputFunc( + rows: Iterator[InternalRow], + index: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowCounter: Accumulator[Int]): Iterator[CarbonRow] = { +val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) +val conf = DataLoadProcessBuilder.createConfiguration(model) +val rowParser = new RowParserImpl(conf.getDataFields, conf) +val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf) +TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + wrapException(e, model) +} + +new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { +var row : CarbonRow = null +val rawRow = + rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]] +if(isRawDataRequired) { + row = new CarbonRow(rowParser.parseRow(rawRow), rawRow) +} else { + row = new CarbonRow(rowParser.parseRow(rawRow)) +} +rowCounter.add(1) +row + } +} + } + + def internalSampleInputFunc( --- End diff -- ok ---
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r24542 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala --- @@ -95,6 +96,67 @@ object DataLoadProcessorStepOnSpark { } } + def internalInputFunc( + rows: Iterator[InternalRow], + index: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowCounter: Accumulator[Int]): Iterator[CarbonRow] = { +val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) +val conf = DataLoadProcessBuilder.createConfiguration(model) +val rowParser = new RowParserImpl(conf.getDataFields, conf) +val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf) +TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + wrapException(e, model) +} + +new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { +var row : CarbonRow = null +val rawRow = + rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]] +if(isRawDataRequired) { + row = new CarbonRow(rowParser.parseRow(rawRow), rawRow) +} else { + row = new CarbonRow(rowParser.parseRow(rawRow)) +} +rowCounter.add(1) +row + } +} + } + + def internalSampleInputFunc( --- End diff -- the parameter "rows" of them is a different type. ---
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r245198317 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala --- @@ -106,6 +106,24 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql("SELECT * FROM carbon_localsort_once ORDER BY name")) } + test("Make sure the result is right and sorted in global level for range_sort") { --- End diff -- ok ---
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r245198367 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io.{IOException, ObjectInputStream, ObjectOutputStream} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag +import scala.util.hashing.byteswap32 + +import org.apache.spark.rdd.{PartitionPruningRDD, RDD} +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.{CollectionsUtils, Utils} + +/** + * support data skew scenario + * copy from spark: RangePartiitoner + */ +class DataSkewRangePartitioner[K: Ordering : ClassTag, V]( --- End diff -- ok ---
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r244681963 --- Diff: integration/spark-common/src/main/scala/org/apache/spark/DataSkewRangePartitioner.scala --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io.{IOException, ObjectInputStream, ObjectOutputStream} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag +import scala.util.hashing.byteswap32 + +import org.apache.spark.rdd.{PartitionPruningRDD, RDD} +import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.{CollectionsUtils, Utils} + +/** + * support data skew scenario + * copy from spark: RangePartiitoner + */ +class DataSkewRangePartitioner[K: Ordering : ClassTag, V]( --- End diff -- Please add the comment how it is different from spark's range partitioner and how you do skew partitioning ---
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r244681720 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { +// initialize and prepare row counter +val sc = sparkSession.sparkContext +val modelBroadcast = sc.broadcast(model) +val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") +val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") +val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") +val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") +val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + +// 1. Input +hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) +val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, hadoopConf) + .mapPartitionsWithIndex { case (index, rows) => +DataLoadProcessorStepOnSpark + .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + +// 2. Convert +val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) +val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) +DataLoadProcessorStepOnSpark + .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) + } + .filter(_ != null) + +// 3. Range partition by range_column +val configuration = DataLoadProcessBuilder.createConfiguration(model) +val rangeColumnIndex = + indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields) +// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)] +val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex)) +// range partition by key +val numPartitions = getNumPartitions(configuration, model, convertRDD) +val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) +import scala.reflect.classTag +val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast) +val rangeRDD = keyRDD + .partitionBy( +new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object])) + .map(_._2) + +// 4. Sort and Write data +sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, +writeStepRowCounter, conf.value.value)) + +// Log the number of rows in each step +LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value) +LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + +// Update status +if (partialSuccessAccum.value != 0) { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + + "Partial_Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + executionErrors.failureCauses = FailureCauses.BAD_RECORDS + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} else { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} + } + + /** + * provide
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r244681813 --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala --- @@ -106,6 +106,24 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql("SELECT * FROM carbon_localsort_once ORDER BY name")) } + test("Make sure the result is right and sorted in global level for range_sort") { --- End diff -- Please add some test cases with scalefactor as well. ---
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r244681164 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { +// initialize and prepare row counter +val sc = sparkSession.sparkContext +val modelBroadcast = sc.broadcast(model) +val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") +val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") +val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") +val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") +val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + +// 1. Input +hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) +val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, hadoopConf) + .mapPartitionsWithIndex { case (index, rows) => +DataLoadProcessorStepOnSpark + .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + +// 2. Convert +val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) +val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) +DataLoadProcessorStepOnSpark + .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) + } + .filter(_ != null) + +// 3. Range partition by range_column +val configuration = DataLoadProcessBuilder.createConfiguration(model) +val rangeColumnIndex = + indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields) +// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)] +val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex)) +// range partition by key +val numPartitions = getNumPartitions(configuration, model, convertRDD) +val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) +import scala.reflect.classTag +val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast) +val rangeRDD = keyRDD + .partitionBy( +new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object])) + .map(_._2) + +// 4. Sort and Write data +sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, +writeStepRowCounter, conf.value.value)) + +// Log the number of rows in each step +LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value) +LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + +// Update status +if (partialSuccessAccum.value != 0) { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + + "Partial_Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + executionErrors.failureCauses = FailureCauses.BAD_RECORDS + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} else { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} + } + + /** + * provide
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r244681062 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala --- @@ -95,6 +96,67 @@ object DataLoadProcessorStepOnSpark { } } + def internalInputFunc( + rows: Iterator[InternalRow], + index: Int, + modelBroadcast: Broadcast[CarbonLoadModel], + rowCounter: Accumulator[Int]): Iterator[CarbonRow] = { +val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) +val conf = DataLoadProcessBuilder.createConfiguration(model) +val rowParser = new RowParserImpl(conf.getDataFields, conf) +val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf) +TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => + wrapException(e, model) +} + +new Iterator[CarbonRow] { + override def hasNext: Boolean = rows.hasNext + + override def next(): CarbonRow = { +var row : CarbonRow = null +val rawRow = + rows.next().asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[Object]] +if(isRawDataRequired) { + row = new CarbonRow(rowParser.parseRow(rawRow), rawRow) +} else { + row = new CarbonRow(rowParser.parseRow(rawRow)) +} +rowCounter.add(1) +row + } +} + } + + def internalSampleInputFunc( --- End diff -- Please unify `internalSampleInputFunc` and `internalInputFunc` ---
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r244680876 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( + sparkSession: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = { +// initialize and prepare row counter +val sc = sparkSession.sparkContext +val modelBroadcast = sc.broadcast(model) +val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") +val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator") +val convertStepRowCounter = sc.accumulator(0, "Convert Processor Accumulator") +val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") +val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") + +// 1. Input +hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) +val inputRDD = CsvRDDHelper + .csvFileScanRDD(sparkSession, model, hadoopConf) + .mapPartitionsWithIndex { case (index, rows) => +DataLoadProcessorStepOnSpark + .internalInputFunc(rows, index, modelBroadcast, inputStepRowCounter) + } + +// 2. Convert +val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) +val convertRDD = inputRDD + .mapPartitionsWithIndex { case (index, rows) => + ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value) +DataLoadProcessorStepOnSpark + .convertFunc(rows, index, modelBroadcast, partialSuccessAccum, convertStepRowCounter) + } + .filter(_ != null) + +// 3. Range partition by range_column +val configuration = DataLoadProcessBuilder.createConfiguration(model) +val rangeColumnIndex = + indexOfColumn(model.getRangePartitionColumn, configuration.getDataFields) +// convert RDD[CarbonRow] to RDD[(rangeColumn, CarbonRow)] +val keyRDD = convertRDD.keyBy(_.getObject(rangeColumnIndex)) +// range partition by key +val numPartitions = getNumPartitions(configuration, model, convertRDD) +val objectOrdering: Ordering[Object] = createOrderingForColumn(model.getRangePartitionColumn) +import scala.reflect.classTag +val sampleRDD = getSampleRDD(sparkSession, model, hadoopConf, configuration, modelBroadcast) +val rangeRDD = keyRDD + .partitionBy( +new DataSkewRangePartitioner(numPartitions, sampleRDD)(objectOrdering, classTag[Object])) + .map(_._2) + +// 4. Sort and Write data +sc.runJob(rangeRDD, (context: TaskContext, rows: Iterator[CarbonRow]) => + DataLoadProcessorStepOnSpark.sortAndWriteFunc(rows, context.partitionId, modelBroadcast, +writeStepRowCounter, conf.value.value)) + +// Log the number of rows in each step +LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Converter: " + convertStepRowCounter.value) +LOGGER.info("Total rows processed in step Sort Processor: " + sortStepRowCounter.value) +LOGGER.info("Total rows processed in step Data Writer: " + writeStepRowCounter.value) + +// Update status +if (partialSuccessAccum.value != 0) { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + + "Partial_Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + executionErrors.failureCauses = FailureCauses.BAD_RECORDS + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} else { + val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success" + val loadMetadataDetails = new LoadMetadataDetails() + loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) + val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") + Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) +} + } + + /** + * provide
[GitHub] carbondata pull request #2971: [CARBONDATA-3219] Support range partition the...
Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2971#discussion_r244670461 --- Diff: integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala --- @@ -156,4 +161,206 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } + + /** + * 1. range partition the whole input data + * 2. for each range, sort the data and writ it to CarbonData files + */ + def loadDataUsingRangeSort( --- End diff -- Code is duplicated with method `loadDataUsingGlobalSort` , please try to extract common code and reuse here. ---