This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new d08688a [CARBONDATA-3623]: Fixed global sort compaction failure on timestamp column d08688a is described below commit d08688a1e4164949c4761bf394a01752787457a8 Author: akkio-97 <akshay.nuth...@gmail.com> AuthorDate: Wed Dec 18 19:24:13 2019 +0530 [CARBONDATA-3623]: Fixed global sort compaction failure on timestamp column Problem: In Carbondata the timestamp is converted into long and stored in RDD. While collect(action) is applied on dataframe it gives an error due to the confliction between data types that is - long in carbonScanRDD(from which dataframe is created) and Timestamp in schema. Solution: Called Dataset.ofRows(LogicalPlan) which returns the dataframe. Used Set command which sets all segments to be compacted. And Unset once the compaction is finished. This closes #3515 --- .../dataload/TestGlobalSortDataLoad.scala | 19 +++++ .../spark/load/DataLoadProcessBuilderOnSpark.scala | 44 +--------- .../org/apache/spark/sql/util/SparkSQLUtil.scala | 17 ++++ .../spark/rdd/CarbonTableCompactor.scala | 66 ++++++++------- .../management/CarbonInsertFromStageCommand.scala | 94 ++++++++++------------ 5 files changed, 117 insertions(+), 123 deletions(-) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala index 2aad2f3..9882e15 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala @@ -140,6 +140,25 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo } // ----------------------------------- Compaction ----------------------------------- + test("compaction major: timestamp and long data type confliction") + { + sql("drop table if exists compactionTable") + sql("create table compactionTable (DOJ timestamp, DOB date) STORED BY 'org.apache.carbondata.format'") + sql("alter table compactionTable set tblproperties('sort_columns'='doj, dob', 'sort_scope'='global_sort')") + sql("INSERT INTO compactionTable select '2017-10-12 21:22:23', '1997-10-10'") + sql("INSERT INTO compactionTable select '2018-11-12 20:22:23', '1997-10-10'") + sql("alter table compactionTable compact 'major'") + val showSegments = sql("show segments for table compactiontable").collect() + showSegments.find(_.get(0).toString.equals("0.1")) match { + case Some(row) => assert(row.get(1).toString.equalsIgnoreCase("Success")) + } + showSegments.find(_.get(0).toString.equals("1")) match { + case Some(row) => assert(row.get(1).toString.equalsIgnoreCase("Compacted")) + } + showSegments.find(_.get(0).toString.equals("0")) match { + case Some(row) => assert(row.get(1).toString.equalsIgnoreCase("Compacted")) + } + } test("Compaction GLOBAL_SORT * 2") { sql("DROP TABLE IF EXISTS carbon_localsort_twice") sql( diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index ae859c0..dc97cd9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -22,20 +22,16 @@ import java.util.Comparator import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapreduce.InputSplit -import org.apache.spark.{Accumulator, CarbonInputMetrics, DataSkewRangePartitioner, TaskContext} +import org.apache.spark.{Accumulator, DataSkewRangePartitioner, TaskContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.execution.command.ExecutionErrors -import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter} +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, StructField, StructType} @@ -44,7 +40,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} import org.apache.carbondata.core.util._ import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer -import org.apache.carbondata.hadoop.CarbonProjection import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat import org.apache.carbondata.processing.loading.{CarbonDataLoadConfiguration, DataField, DataLoadProcessBuilder, FailureCauses} import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants @@ -52,9 +47,8 @@ import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, TableOptionConstant} -import org.apache.carbondata.spark.rdd.{CarbonScanRDD, StringArrayRow} +import org.apache.carbondata.spark.rdd.StringArrayRow import org.apache.carbondata.spark.util.CommonUtil -import org.apache.carbondata.store.CarbonRowReadSupport /** * Use sortBy operator in spark to load the data @@ -429,38 +423,6 @@ object DataLoadProcessBuilderOnSpark { } loadModel } - - /** - * create DataFrame basing on specified splits - */ - def createInputDataFrame( - sparkSession: SparkSession, - carbonTable: CarbonTable, - splits: Seq[InputSplit] - ): DataFrame = { - val columns = carbonTable - .getCreateOrderColumn - .asScala - .map(_.getColName) - .toArray - val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns) - val rdd: RDD[InternalRow] = new CarbonScanRDD[CarbonRow]( - sparkSession, - columnProjection = new CarbonProjection(columns), - null, - carbonTable.getAbsoluteTableIdentifier, - carbonTable.getTableInfo.serialize, - carbonTable.getTableInfo, - new CarbonInputMetrics, - null, - classOf[SparkDataTypeConverterImpl], - classOf[CarbonRowReadSupport], - splits.asJava) - .map { row => - new GenericInternalRow(row.getData.asInstanceOf[Array[Any]]) - } - SparkSQLUtil.execute(rdd, schema, sparkSession) - } } class PrimtiveOrdering(dataType: DataType) extends Ordering[Object] { diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala index 13e7c45..056e555 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala @@ -35,6 +35,8 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf} import org.apache.spark.sql.types.StructType import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil, Utils} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable + object SparkSQLUtil { def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState @@ -257,4 +259,19 @@ object SparkSQLUtil { taskGroupDesc } + /** + * create DataFrame based on carbonTable + */ + def createInputDataFrame( + sparkSession: SparkSession, + carbonTable: CarbonTable): DataFrame = { + /** + * [[org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType]] validates the + * datatype of column data and corresponding datatype in schema provided to create dataframe. + * Since carbonScanRDD gives Long data for timestamp column and corresponding column datatype in + * schema is Timestamp, this validation fails if we use createDataFrame API which takes rdd as + * input. Hence, using below API which creates dataframe from tablename. + */ + sparkSession.sqlContext.table(carbonTable.getTableName) + } } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index 56b6a2e..aeeec3a 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -26,38 +26,30 @@ import scala.collection.mutable import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.{InputSplit, Job} -import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.{CarbonUtils, SparkSession, SQLContext} import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel} -import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter} +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.spark.util.MergeIndexUtil -import org.apache.spark.CarbonInputMetrics -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.constants.SortScopeOptions.SortScope import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.datastore.row.CarbonRow -import org.apache.carbondata.core.metadata.datatype.{StructField, StructType} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.SegmentFileStore import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.events._ -import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat, CarbonTableOutputFormat} -import org.apache.carbondata.hadoop.CarbonProjection +import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} +import org.apache.carbondata.hadoop.CarbonInputSplit import org.apache.carbondata.indexserver.DistributedRDDUtils import org.apache.carbondata.processing.loading.FailureCauses -import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType} -import org.apache.carbondata.processing.util.TableOptionConstant import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark import org.apache.carbondata.spark.MergeResultImpl -import org.apache.carbondata.store.CarbonRowReadSupport /** * This class is used to perform compaction on carbon table. @@ -374,26 +366,40 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, sparkSession: SparkSession, carbonLoadModel: CarbonLoadModel, carbonMergerMapping: CarbonMergerMapping): Array[(String, Boolean)] = { + val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val splits = splitsOfSegments( sparkSession, - carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, + table, carbonMergerMapping.validSegments) - val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame( - sparkSession, - carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable, - splits.asScala) - // generate LoadModel which can be used global_sort flow - val outputModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort( - sparkSession, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable) - outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1)) - DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort( - sparkSession, - Option(dataFrame), - outputModel, - SparkSQLUtil.sessionState(sparkSession).newHadoopConf()) - .map { row => - (row._1, FailureCauses.NONE == row._2._2.failureCauses) - } + var loadResult: Array[(String, Boolean)] = null + try { + CarbonUtils + .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + table.getDatabaseName + CarbonCommonConstants.POINT + table.getTableName, + splits.asScala.map(s => s.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(",")) + val dataFrame = SparkSQLUtil.createInputDataFrame( + sparkSession, + table) + + // generate LoadModel which can be used global_sort flow + val outputModel = DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort( + sparkSession, table) + outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1)) + loadResult = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort( + sparkSession, + Option(dataFrame), + outputModel, + SparkSQLUtil.sessionState(sparkSession).newHadoopConf()) + .map { row => + (row._1, FailureCauses.NONE == row._2._2.failureCauses) + } + } finally { + CarbonUtils + .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + table.getDatabaseName + "." + + table.getTableName) + } + loadResult } /** diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala index 0cbccdd..a4dd45b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala @@ -28,16 +28,12 @@ import com.google.gson.Gson import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.InputSplit import org.apache.log4j.Logger -import org.apache.spark.CarbonInputMetrics -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession} -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.{CarbonEnv, CarbonUtils, Row, SparkSession} import org.apache.spark.sql.execution.command.{Checker, DataCommand} -import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter} +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.filesystem.CarbonFile import org.apache.carbondata.core.datastore.impl.FileFactory @@ -46,13 +42,11 @@ import org.apache.carbondata.core.metadata.{ColumnarFormatVersion, SegmentFileSt import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, StageInput} import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection} +import org.apache.carbondata.hadoop.CarbonInputSplit import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark -import org.apache.carbondata.spark.rdd.CarbonScanRDD -import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl /** * Collect stage input files and trigger a loading into carbon table. @@ -264,14 +258,25 @@ case class CarbonInsertFromStageCommand( LOGGER.info(s"start to load ${splits.size} files into " + s"${table.getDatabaseName}.${table.getTableName}") val start = System.currentTimeMillis() - val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits) - DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort( - spark, - Option(dataFrame), - loadModel, - SparkSQLUtil.sessionState(spark).newHadoopConf() - ).map { row => + try { + CarbonUtils + .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + table.getDatabaseName + CarbonCommonConstants.POINT + table.getTableName, + splits.map(s => s.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(",")) + val dataFrame = SparkSQLUtil.createInputDataFrame(spark, table) + DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort( + spark, + Option(dataFrame), + loadModel, + SparkSQLUtil.sessionState(spark).newHadoopConf() + ).map { row => (row._1, FailureCauses.NONE == row._2._2.failureCauses) + } + } finally { + CarbonUtils + .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + table.getDatabaseName + "." + + table.getTableName) } LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") @@ -311,15 +316,30 @@ case class CarbonInsertFromStageCommand( val start = System.currentTimeMillis() partitionDataList.map { case (partition, splits) => - LOGGER.info(s"start to load ${splits.size} files into " + - s"${table.getDatabaseName}.${table.getTableName}. " + - s"Partition information: ${partition.mkString(",")}") - val dataFrame = createInputDataFrameOfInternalRow(spark, table, splits) + LOGGER.info(s"start to load ${ splits.size } files into " + + s"${ table.getDatabaseName }.${ table.getTableName }. " + + s"Partition information: ${ partition.mkString(",") }") + val dataFrame = try { + // Segments should be set for query here, because consider a scenario where custom + // compaction is triggered, so it can happen that all the segments might be taken into + // consideration instead of custom segments if we do not set, leading to duplicate data in + // compacted segment. To avoid this, segments to be considered are to be set in threadset. + CarbonUtils + .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + table.getDatabaseName + CarbonCommonConstants.POINT + + table.getTableName, + splits.map(split => split.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(",")) + SparkSQLUtil.createInputDataFrame(spark, table) + } finally { + CarbonUtils.threadUnset( + CarbonCommonConstants.CARBON_INPUT_SEGMENTS + table.getDatabaseName + + CarbonCommonConstants.POINT + + table.getTableName) + } val columns = dataFrame.columns val header = columns.mkString(",") val selectColumns = columns.filter(!partition.contains(_)) val selectedDataFrame = dataFrame.select(selectColumns.head, selectColumns.tail: _*) - val loadCommand = CarbonLoadDataCommand( databaseNameOp = Option(table.getDatabaseName), tableName = table.getTableName, @@ -337,7 +357,7 @@ case class CarbonInsertFromStageCommand( ) loadCommand.run(spark) } - LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") + LOGGER.info(s"finish data loading, time taken ${ System.currentTimeMillis() - start }ms") } /** @@ -481,35 +501,5 @@ case class CarbonInsertFromStageCommand( } } - /** - * create DataFrame basing on specified splits - */ - private def createInputDataFrameOfInternalRow( - sparkSession: SparkSession, - carbonTable: CarbonTable, - splits: Seq[InputSplit] - ): DataFrame = { - val columns = carbonTable - .getCreateOrderColumn - .asScala - .map(_.getColName) - .toArray - val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns) - val rdd: RDD[InternalRow] = new CarbonScanRDD[InternalRow]( - sparkSession, - columnProjection = new CarbonProjection(columns), - null, - carbonTable.getAbsoluteTableIdentifier, - carbonTable.getTableInfo.serialize, - carbonTable.getTableInfo, - new CarbonInputMetrics, - null, - classOf[SparkDataTypeConverterImpl], - classOf[SparkRowReadSupportImpl], - splits.asJava - ) - SparkSQLUtil.execute(rdd, schema, sparkSession) - } - override protected def opName: String = "INSERT STAGE" }