This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d08688a  [CARBONDATA-3623]: Fixed global sort compaction failure on 
timestamp column
d08688a is described below

commit d08688a1e4164949c4761bf394a01752787457a8
Author: akkio-97 <akshay.nuth...@gmail.com>
AuthorDate: Wed Dec 18 19:24:13 2019 +0530

    [CARBONDATA-3623]: Fixed global sort compaction failure on timestamp column
    
    Problem:
    In Carbondata the timestamp is converted into long and stored in RDD.
    While collect(action) is applied on dataframe it gives an error due to the 
confliction between data types that is - long in carbonScanRDD(from which 
dataframe is created) and Timestamp in schema.
    
    Solution:
    Called Dataset.ofRows(LogicalPlan) which returns the dataframe. Used Set 
command which sets all segments to be compacted. And Unset once the compaction 
is finished.
    
    This closes #3515
---
 .../dataload/TestGlobalSortDataLoad.scala          | 19 +++++
 .../spark/load/DataLoadProcessBuilderOnSpark.scala | 44 +---------
 .../org/apache/spark/sql/util/SparkSQLUtil.scala   | 17 ++++
 .../spark/rdd/CarbonTableCompactor.scala           | 66 ++++++++-------
 .../management/CarbonInsertFromStageCommand.scala  | 94 ++++++++++------------
 5 files changed, 117 insertions(+), 123 deletions(-)

diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 2aad2f3..9882e15 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -140,6 +140,25 @@ class TestGlobalSortDataLoad extends QueryTest with 
BeforeAndAfterEach with Befo
   }
 
   // ----------------------------------- Compaction 
-----------------------------------
+  test("compaction major: timestamp and long data type confliction")
+  {
+    sql("drop table if exists compactionTable")
+    sql("create table compactionTable (DOJ timestamp, DOB date) STORED BY 
'org.apache.carbondata.format'")
+    sql("alter table compactionTable set tblproperties('sort_columns'='doj, 
dob', 'sort_scope'='global_sort')")
+    sql("INSERT INTO compactionTable select '2017-10-12 21:22:23', 
'1997-10-10'")
+    sql("INSERT INTO compactionTable select '2018-11-12 20:22:23', 
'1997-10-10'")
+    sql("alter table compactionTable compact 'major'")
+    val showSegments = sql("show segments for table compactiontable").collect()
+    showSegments.find(_.get(0).toString.equals("0.1")) match {
+      case Some(row) => assert(row.get(1).toString.equalsIgnoreCase("Success"))
+    }
+    showSegments.find(_.get(0).toString.equals("1")) match {
+      case Some(row) => 
assert(row.get(1).toString.equalsIgnoreCase("Compacted"))
+    }
+    showSegments.find(_.get(0).toString.equals("0")) match {
+      case Some(row) => 
assert(row.get(1).toString.equalsIgnoreCase("Compacted"))
+    }
+  }
   test("Compaction GLOBAL_SORT * 2") {
     sql("DROP TABLE IF EXISTS carbon_localsort_twice")
     sql(
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index ae859c0..dc97cd9 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -22,20 +22,16 @@ import java.util.Comparator
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.spark.{Accumulator, CarbonInputMetrics, 
DataSkewRangePartitioner, TaskContext}
+import org.apache.spark.{Accumulator, DataSkewRangePartitioner, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, 
StructField, StructType}
@@ -44,7 +40,6 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus}
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer
-import org.apache.carbondata.hadoop.CarbonProjection
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat
 import org.apache.carbondata.processing.loading.{CarbonDataLoadConfiguration, 
DataField, DataLoadProcessBuilder, FailureCauses}
 import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
@@ -52,9 +47,8 @@ import 
org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, 
NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
TableOptionConstant}
-import org.apache.carbondata.spark.rdd.{CarbonScanRDD, StringArrayRow}
+import org.apache.carbondata.spark.rdd.StringArrayRow
 import org.apache.carbondata.spark.util.CommonUtil
-import org.apache.carbondata.store.CarbonRowReadSupport
 
 /**
  * Use sortBy operator in spark to load the data
@@ -429,38 +423,6 @@ object DataLoadProcessBuilderOnSpark {
     }
     loadModel
   }
-
-  /**
-   * create DataFrame basing on specified splits
-   */
-  def createInputDataFrame(
-      sparkSession: SparkSession,
-      carbonTable: CarbonTable,
-      splits: Seq[InputSplit]
-  ): DataFrame = {
-    val columns = carbonTable
-      .getCreateOrderColumn
-      .asScala
-      .map(_.getColName)
-      .toArray
-    val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
-    val rdd: RDD[InternalRow] = new CarbonScanRDD[CarbonRow](
-      sparkSession,
-      columnProjection = new CarbonProjection(columns),
-      null,
-      carbonTable.getAbsoluteTableIdentifier,
-      carbonTable.getTableInfo.serialize,
-      carbonTable.getTableInfo,
-      new CarbonInputMetrics,
-      null,
-      classOf[SparkDataTypeConverterImpl],
-      classOf[CarbonRowReadSupport],
-      splits.asJava)
-      .map { row =>
-        new GenericInternalRow(row.getData.asInstanceOf[Array[Any]])
-      }
-    SparkSQLUtil.execute(rdd, schema, sparkSession)
-  }
 }
 
 class PrimtiveOrdering(dataType: DataType) extends Ordering[Object] {
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index 13e7c45..056e555 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -35,6 +35,8 @@ import org.apache.spark.sql.internal.{SessionState, SQLConf}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{CarbonReflectionUtils, 
SerializableConfiguration, SparkUtil, Utils}
 
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
 object SparkSQLUtil {
   def sessionState(sparkSession: SparkSession): SessionState = 
sparkSession.sessionState
 
@@ -257,4 +259,19 @@ object SparkSQLUtil {
     taskGroupDesc
   }
 
+  /**
+   * create DataFrame based on carbonTable
+   */
+  def createInputDataFrame(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable): DataFrame = {
+    /**
+     * 
[[org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType]] 
validates the
+     * datatype of column data and corresponding datatype in schema provided 
to create dataframe.
+     * Since carbonScanRDD gives Long data for timestamp column and 
corresponding column datatype in
+     * schema is Timestamp, this validation fails if we use createDataFrame 
API which takes rdd as
+     * input. Hence, using below API which creates dataframe from tablename.
+     */
+    sparkSession.sqlContext.table(carbonTable.getTableName)
+  }
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 56b6a2e..aeeec3a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -26,38 +26,30 @@ import scala.collection.mutable
 
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.{InputSplit, Job}
-import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.{CarbonUtils, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, 
CompactionCallableModel, CompactionModel}
-import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.util.MergeIndexUtil
-import org.apache.spark.CarbonInputMetrics
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.GenericRow
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.metadata.datatype.{StructField, StructType}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
-import org.apache.carbondata.hadoop.api.{CarbonInputFormat, 
CarbonTableInputFormat, CarbonTableOutputFormat}
-import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, 
CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.CarbonInputSplit
 import org.apache.carbondata.indexserver.DistributedRDDUtils
 import org.apache.carbondata.processing.loading.FailureCauses
-import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, 
CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.util.TableOptionConstant
 import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
 import org.apache.carbondata.spark.MergeResultImpl
-import org.apache.carbondata.store.CarbonRowReadSupport
 
 /**
  * This class is used to perform compaction on carbon table.
@@ -374,26 +366,40 @@ class CarbonTableCompactor(carbonLoadModel: 
CarbonLoadModel,
       sparkSession: SparkSession,
       carbonLoadModel: CarbonLoadModel,
       carbonMergerMapping: CarbonMergerMapping): Array[(String, Boolean)] = {
+    val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val splits = splitsOfSegments(
       sparkSession,
-      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
+      table,
       carbonMergerMapping.validSegments)
-    val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(
-      sparkSession,
-      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
-      splits.asScala)
-    // generate LoadModel which can be used global_sort flow
-    val outputModel = 
DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(
-      sparkSession, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable)
-    outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1))
-    DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
-      sparkSession,
-      Option(dataFrame),
-      outputModel,
-      SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
-      .map { row =>
-        (row._1, FailureCauses.NONE == row._2._2.failureCauses)
-      }
+    var loadResult: Array[(String, Boolean)] = null
+    try {
+      CarbonUtils
+        .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+          table.getDatabaseName + CarbonCommonConstants.POINT + 
table.getTableName,
+          splits.asScala.map(s => 
s.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(","))
+      val dataFrame = SparkSQLUtil.createInputDataFrame(
+        sparkSession,
+        table)
+
+      // generate LoadModel which can be used global_sort flow
+      val outputModel = 
DataLoadProcessBuilderOnSpark.createLoadModelForGlobalSort(
+        sparkSession, table)
+      
outputModel.setSegmentId(carbonMergerMapping.mergedLoadName.split("_")(1))
+      loadResult = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+        sparkSession,
+        Option(dataFrame),
+        outputModel,
+        SparkSQLUtil.sessionState(sparkSession).newHadoopConf())
+        .map { row =>
+          (row._1, FailureCauses.NONE == row._2._2.failureCauses)
+        }
+    } finally {
+      CarbonUtils
+        .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+          table.getDatabaseName + "." +
+          table.getTableName)
+    }
+    loadResult
   }
 
   /**
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index 0cbccdd..a4dd45b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -28,16 +28,12 @@ import com.google.gson.Gson
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.log4j.Logger
-import org.apache.spark.CarbonInputMetrics
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.{CarbonEnv, CarbonUtils, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
-import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -46,13 +42,11 @@ import 
org.apache.carbondata.core.metadata.{ColumnarFormatVersion, SegmentFileSt
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager, StageInput}
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.hadoop.CarbonInputSplit
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
-import org.apache.carbondata.spark.rdd.CarbonScanRDD
-import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
 /**
  * Collect stage input files and trigger a loading into carbon table.
@@ -264,14 +258,25 @@ case class CarbonInsertFromStageCommand(
       LOGGER.info(s"start to load ${splits.size} files into " +
                   s"${table.getDatabaseName}.${table.getTableName}")
       val start = System.currentTimeMillis()
-      val dataFrame = 
DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
-      DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
-        spark,
-        Option(dataFrame),
-        loadModel,
-        SparkSQLUtil.sessionState(spark).newHadoopConf()
-      ).map { row =>
+      try {
+        CarbonUtils
+          .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+            table.getDatabaseName + CarbonCommonConstants.POINT + 
table.getTableName,
+            splits.map(s => 
s.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(","))
+        val dataFrame = SparkSQLUtil.createInputDataFrame(spark, table)
+        DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+          spark,
+          Option(dataFrame),
+          loadModel,
+          SparkSQLUtil.sessionState(spark).newHadoopConf()
+        ).map { row =>
           (row._1, FailureCauses.NONE == row._2._2.failureCauses)
+        }
+      } finally {
+        CarbonUtils
+          .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+            table.getDatabaseName + "." +
+            table.getTableName)
       }
       LOGGER.info(s"finish data loading, time taken 
${System.currentTimeMillis() - start}ms")
 
@@ -311,15 +316,30 @@ case class CarbonInsertFromStageCommand(
     val start = System.currentTimeMillis()
     partitionDataList.map {
       case (partition, splits) =>
-        LOGGER.info(s"start to load ${splits.size} files into " +
-          s"${table.getDatabaseName}.${table.getTableName}. " +
-          s"Partition information: ${partition.mkString(",")}")
-        val dataFrame = createInputDataFrameOfInternalRow(spark, table, splits)
+        LOGGER.info(s"start to load ${ splits.size } files into " +
+          s"${ table.getDatabaseName }.${ table.getTableName }. " +
+          s"Partition information: ${ partition.mkString(",") }")
+        val dataFrame = try {
+          // Segments should be set for query here, because consider a 
scenario where custom
+          // compaction is triggered, so it can happen that all the segments 
might be taken into
+          // consideration instead of custom segments if we do not set, 
leading to duplicate data in
+          // compacted segment. To avoid this, segments to be considered are 
to be set in threadset.
+          CarbonUtils
+            .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+              table.getDatabaseName + CarbonCommonConstants.POINT +
+              table.getTableName,
+              splits.map(split => 
split.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(","))
+          SparkSQLUtil.createInputDataFrame(spark, table)
+        } finally {
+          CarbonUtils.threadUnset(
+            CarbonCommonConstants.CARBON_INPUT_SEGMENTS + 
table.getDatabaseName +
+              CarbonCommonConstants.POINT +
+              table.getTableName)
+        }
         val columns = dataFrame.columns
         val header = columns.mkString(",")
         val selectColumns = columns.filter(!partition.contains(_))
         val selectedDataFrame = dataFrame.select(selectColumns.head, 
selectColumns.tail: _*)
-
         val loadCommand = CarbonLoadDataCommand(
           databaseNameOp = Option(table.getDatabaseName),
           tableName = table.getTableName,
@@ -337,7 +357,7 @@ case class CarbonInsertFromStageCommand(
         )
         loadCommand.run(spark)
     }
-    LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() 
- start}ms")
+    LOGGER.info(s"finish data loading, time taken ${ 
System.currentTimeMillis() - start }ms")
   }
 
   /**
@@ -481,35 +501,5 @@ case class CarbonInsertFromStageCommand(
     }
   }
 
-  /**
-   * create DataFrame basing on specified splits
-   */
-  private def createInputDataFrameOfInternalRow(
-      sparkSession: SparkSession,
-      carbonTable: CarbonTable,
-      splits: Seq[InputSplit]
-    ): DataFrame = {
-    val columns = carbonTable
-      .getCreateOrderColumn
-      .asScala
-      .map(_.getColName)
-      .toArray
-    val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
-    val rdd: RDD[InternalRow] = new CarbonScanRDD[InternalRow](
-      sparkSession,
-      columnProjection = new CarbonProjection(columns),
-      null,
-      carbonTable.getAbsoluteTableIdentifier,
-      carbonTable.getTableInfo.serialize,
-      carbonTable.getTableInfo,
-      new CarbonInputMetrics,
-      null,
-      classOf[SparkDataTypeConverterImpl],
-      classOf[SparkRowReadSupportImpl],
-      splits.asJava
-    )
-    SparkSQLUtil.execute(rdd, schema, sparkSession)
-  }
-
   override protected def opName: String = "INSERT STAGE"
 }

Reply via email to