[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r379262676
 
 

 ##
 File path: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 ##
 @@ -834,4 +843,179 @@ object CommonUtil {
 displaySize
   }
 
+  def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow,
+  fieldTypes: Seq[DataType],
+  outputArrayLength: Int): Array[AnyRef] = {
+val data = new Array[AnyRef](outputArrayLength)
+var i = 0
+val fieldTypesLen = fieldTypes.length
+while (i < fieldTypesLen) {
+  if (!row.isNullAt(i)) {
+fieldTypes(i) match {
+  case StringType =>
+data(i) = row.getString(i)
+  case d: DecimalType =>
+data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+  case arrayType : ArrayType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), 
arrayType)
+  case structType : StructType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i,
+  structType.fields.length), structType)
+  case mapType : MapType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), 
mapType)
+  case other =>
 
 Review comment:
   No. cannot send to 'other' branch.
   **other branch will give utf8 string, we need java string object here.**
   
   added comment now


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801875
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##
 @@ -17,85 +17,517 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-relation: CarbonDatasourceHadoopRelation,
-child: LogicalPlan,
-overwrite: Boolean,
-partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, 
CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var logicalPlan: LogicalPlan,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801536
 
 

 ##
 File path: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 ##
 @@ -834,4 +843,179 @@ object CommonUtil {
 displaySize
   }
 
+  def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow,
+  fieldTypes: Seq[DataType],
+  outputArrayLength: Int): Array[AnyRef] = {
+val data = new Array[AnyRef](outputArrayLength)
+var i = 0
+val fieldTypesLen = fieldTypes.length
+while (i < fieldTypesLen) {
+  if (!row.isNullAt(i)) {
+fieldTypes(i) match {
+  case StringType =>
+data(i) = row.getString(i)
+  case d: DecimalType =>
+data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+  case arrayType : ArrayType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), 
arrayType)
+  case structType : StructType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i,
+  structType.fields.length), structType)
+  case mapType : MapType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), 
mapType)
+  case other =>
 
 Review comment:
   it was from base code. 
   
   yes. done like that


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801973
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.util
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, 
UpdateTableModel}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+
+/*
+* insert into with df, doesn't use logical plan
+*
+*/
+case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801595
 
 

 ##
 File path: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
 ##
 @@ -140,6 +141,89 @@ object CarbonScalaUtil {
 }
   }
 
+  /**
+   * Converts incoming value to String after converting data as per the data 
type.
+   *
+   * @param value   Input value to convert
+   * @param dataTypeDatatype to convert and then convert to String
+   * @param timeStampFormat Timestamp format to convert in case of timestamp 
datatypes
+   * @param dateFormat  DataFormat to convert in case of DateType datatype
+   * @return converted String
+   */
+  def convertStaticPartitionToValues(
+  value: String,
+  dataType: DataType,
+  timeStampFormat: SimpleDateFormat,
+  dateFormat: SimpleDateFormat): AnyRef = {
+val defaultValue = value != null && 
value.equalsIgnoreCase(hiveDefaultPartition)
+try {
+  dataType match {
+case TimestampType if timeStampFormat != null =>
+  val formattedString =
+if (defaultValue) {
+  timeStampFormat.format(new Date())
+} else {
+  timeStampFormat.format(DateTimeUtils.stringToTime(value))
+}
+  val convertedValue =
+DataTypeUtil
+  .getDataBasedOnDataType(formattedString,
+
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(TimestampType))
+  convertedValue
+case DateType if dateFormat != null =>
+  val formattedString =
+if (defaultValue) {
+  dateFormat.format(new Date())
+} else {
+  dateFormat.format(DateTimeUtils.stringToTime(value))
+}
+  val convertedValue =
+DataTypeUtil
+  .getDataBasedOnDataType(formattedString,
+
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(TimestampType))
+  val date = generateDictionaryKey(convertedValue.asInstanceOf[Long])
+  date.asInstanceOf[AnyRef]
+case BinaryType =>
+  // TODO: decode required ? currently it is working
+  ByteUtil.toBytes(value)
+case _ =>
+  val convertedValue =
+DataTypeUtil
+  .getDataBasedOnDataType(value,
+
CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataType))
+  if (convertedValue == null) {
+if (defaultValue) {
+  dataType match {
+case BooleanType =>
+  return false.asInstanceOf[AnyRef]
+case _ =>
+  return 0.asInstanceOf[AnyRef]
+  }
+}
+throw new MalformedCarbonCommandException(
+  s"Value $value with datatype $dataType on static partition is 
not correct")
+  }
+  convertedValue
+  }
+} catch {
+  case e: Exception =>
+throw new MalformedCarbonCommandException(
+  s"Value $value with datatype $dataType on static partition is not 
correct")
+}
+  }
+
+  def generateDictionaryKey(timeValue: Long): Int = {
+if (timeValue < DateDirectDictionaryGenerator.MIN_VALUE ||
+timeValue > DateDirectDictionaryGenerator.MAX_VALUE) {
+  if (LOGGER.isDebugEnabled) {
+LOGGER.debug("Value for date type column is not in valid range. Value 
considered as null.")
+  }
+  return CarbonCommonConstants.DIRECT_DICT_VALUE_NULL
+}
+Math.floor(timeValue.toDouble / 
DateDirectDictionaryGenerator.MILLIS_PER_DAY).toInt +
+DateDirectDictionaryGenerator.cutOffDate
 
 Review comment:
   no change


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801434
 
 

 ##
 File path: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 ##
 @@ -263,17 +265,35 @@ class NewDataFrameLoaderRDD[K, V](
 carbonLoadModel.setPreFetch(false)
 
 val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]()
-val partitionIterator = 
firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context)
 val serializer = SparkEnv.get.closureSerializer.newInstance()
 var serializeBytes: Array[Byte] = null
-while(partitionIterator.hasNext) {
-  val value = partitionIterator.next()
-  if (serializeBytes == null) {
-serializeBytes = serializer.serialize[RDD[Row]](value.rdd).array()
+if (isDataFrame) {
+  val partitionIterator = 
firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit,
+context)
+  while (partitionIterator.hasNext) {
+val value = partitionIterator.next()
+if (serializeBytes == null) {
+  serializeBytes = 
serializer.serialize[RDD[Row]](value.rdd).array()
+}
+recordReaders += new LazyRddIterator(serializer, serializeBytes, 
value.partition,
+  carbonLoadModel, context)
   }
-  recordReaders += new LazyRddIterator(serializer, serializeBytes, 
value.partition,
+} else {
+  // For internal row, no need of converter and re-arrange step,
+  model.setLoadWithoutConverterWithoutReArrangeStep(true)
 
 Review comment:
   yes. done like that


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801215
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##
 @@ -953,28 +983,46 @@ object CarbonDataRDDFactory {
 
   /**
* Execute load process to load from input dataframe
+   *
+   * @param sqlContext sql context
+   * @param dataFrame optional dataframe for insert
+   * @param scanResultRDD optional internal row rdd for direct insert
+   * @param carbonLoadModel load model
+   * @return Return an array that contains all of the elements in 
NewDataFrameLoaderRDD.
*/
   private def loadDataFrame(
   sqlContext: SQLContext,
   dataFrame: Option[DataFrame],
+  scanResultRDD: Option[RDD[InternalRow]],
   carbonLoadModel: CarbonLoadModel
   ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
 try {
-  val rdd = dataFrame.get.rdd
-
+  val rdd = if (dataFrame.isDefined) {
+dataFrame.get.rdd
+  } else {
+scanResultRDD.get
+  }
   val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
 DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
   }.distinct.length
   val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
 nodeNumOfData,
 sqlContext.sparkContext)
-  val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, 
nodes.toArray
-.distinct)
-
+  val newRdd =
+if (dataFrame.isDefined) {
+  new DataLoadCoalescedRDD[Row](
+sqlContext.sparkSession, dataFrame.get.rdd, nodes.toArray.distinct)
+} else {
+  new DataLoadCoalescedRDD[InternalRow](
+sqlContext.sparkSession,
+scanResultRDD.get,
+nodes.toArray.distinct)
+}
   new NewDataFrameLoaderRDD(
 sqlContext.sparkSession,
 new DataLoadResultImpl(),
 carbonLoadModel,
+dataFrame.isDefined,
 
 Review comment:
   yes. done like that


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801391
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##
 @@ -304,11 +305,11 @@ object CarbonDataRDDFactory {
   def loadCarbonData(
   sqlContext: SQLContext,
   carbonLoadModel: CarbonLoadModel,
-  columnar: Boolean,
   partitionStatus: SegmentStatus = SegmentStatus.SUCCESS,
   overwriteTable: Boolean,
   hadoopConf: Configuration,
   dataFrame: Option[DataFrame] = None,
+  scanResultRdd : Option[RDD[InternalRow]],
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378800899
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##
 @@ -17,85 +17,517 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-relation: CarbonDatasourceHadoopRelation,
-child: LogicalPlan,
-overwrite: Boolean,
-partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, 
CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var logicalPlan: LogicalPlan,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var inputSqlString: String = null,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-setAuditTable(relation.carbonTable.getDatabaseName, 
relation.carbonTable.getTableName)
-val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-def containsLimit(plan: LogicalPlan): Boolean = {
-  plan find {
-case limit: GlobalLimit => true
-case other => false
-  } isDefined
+if (!tableInfoOp.isDefined) {
+  throw new RuntimeException(" table info must be present when logical 
relation exist")
 }
-
+// If logical plan is unresolved, need to convert it to resolved.
+

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378800852
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##
 @@ -17,85 +17,517 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-relation: CarbonDatasourceHadoopRelation,
-child: LogicalPlan,
-overwrite: Boolean,
-partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, 
CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var logicalPlan: LogicalPlan,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var inputSqlString: String = null,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-setAuditTable(relation.carbonTable.getDatabaseName, 
relation.carbonTable.getTableName)
-val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-def containsLimit(plan: LogicalPlan): Boolean = {
-  plan find {
-case limit: GlobalLimit => true
-case other => false
-  } isDefined
+if (!tableInfoOp.isDefined) {
+  throw new RuntimeException(" table info must be present when logical 
relation exist")
 }
-
+// If logical plan is unresolved, need to convert it to resolved.
+

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378800963
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##
 @@ -17,85 +17,517 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-relation: CarbonDatasourceHadoopRelation,
-child: LogicalPlan,
-overwrite: Boolean,
-partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, 
CarbonToSparkAdapter, DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var logicalPlan: LogicalPlan,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var inputSqlString: String = null,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-setAuditTable(relation.carbonTable.getDatabaseName, 
relation.carbonTable.getTableName)
-val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-def containsLimit(plan: LogicalPlan): Boolean = {
-  plan find {
-case limit: GlobalLimit => true
-case other => false
-  } isDefined
+if (!tableInfoOp.isDefined) {
 
 Review comment:
   yes. done


This is an automated message from the Apache Git Service.
To respond to the messa

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378771800
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##
 @@ -358,19 +359,41 @@ object CarbonDataRDDFactory {
 }
   }
 } else {
-  status = if (dataFrame.isEmpty && isSortTable &&
-   carbonLoadModel.getRangePartitionColumn != null &&
-   
(sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) ||
-
sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT))) {
-DataLoadProcessBuilderOnSpark
-  .loadDataUsingRangeSort(sqlContext.sparkSession, 
carbonLoadModel, hadoopConf)
-  } else if (isSortTable && 
sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
-
DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkSession,
-  dataFrame, carbonLoadModel, hadoopConf)
-  } else if (dataFrame.isDefined) {
-loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
+  status = if (scanResultRdd.isDefined) {
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378771218
 
 

 ##
 File path: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 ##
 @@ -834,4 +843,179 @@ object CommonUtil {
 displaySize
   }
 
+  def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow,
+  fieldTypes: Seq[DataType],
+  outputArrayLength: Int): Array[AnyRef] = {
+val data = new Array[AnyRef](outputArrayLength)
+var i = 0
+val fieldTypesLen = fieldTypes.length
+while (i < fieldTypesLen) {
+  if (!row.isNullAt(i)) {
+fieldTypes(i) match {
+  case StringType =>
+data(i) = row.getString(i)
+  case d: DecimalType =>
+data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+  case arrayType : ArrayType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), 
arrayType)
+  case structType : StructType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i,
+  structType.fields.length), structType)
+  case mapType : MapType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), 
mapType)
+  case other =>
+data(i) = row.get(i, other)
+}
+  }
+  i += 1
+}
+data
+  }
+
+  /**
+   * After converting complex objects to carbon objects, need to convert to 
byte array
+   *
+   * @param row
+   * @param fields
+   * @param dataFieldsWithComplexDataType
+   * @return
+   */
+  def getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort(
+  row: InternalRow,
+  fields: Seq[StructField],
+  dataFieldsWithComplexDataType: Map[String, GenericDataType[_]]): 
Array[AnyRef] = {
+val data = new Array[AnyRef](fields.size)
+val badRecordLogHolder = new BadRecordLogHolder();
+var i = 0
+val fieldTypesLen = fields.length
+while (i < fieldTypesLen) {
+  if (!row.isNullAt(i)) {
+fields(i).dataType match {
+  case StringType =>
+data(i) = 
DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i),
+  DataTypes.STRING)
+  case d: DecimalType =>
+data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+  case arrayType : ArrayType =>
+val result = convertSparkComplexTypeToCarbonObject(row.get(i, 
arrayType), arrayType)
+// convert carbon complex object to byte array
+val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+val dataOutputStream: DataOutputStream = new 
DataOutputStream(byteArray)
+
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
+  .writeByteArray(result.asInstanceOf[ArrayObject],
+dataOutputStream,
+badRecordLogHolder)
+dataOutputStream.close()
+data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
+  case structType : StructType =>
+val result = convertSparkComplexTypeToCarbonObject(row.get(i, 
structType), structType)
+// convert carbon complex object to byte array
+val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+val dataOutputStream: DataOutputStream = new 
DataOutputStream(byteArray)
+
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[StructDataType]
+  .writeByteArray(result.asInstanceOf[StructObject],
+dataOutputStream,
+badRecordLogHolder)
+dataOutputStream.close()
+data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
 
 Review comment:
   same as above


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378771252
 
 

 ##
 File path: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 ##
 @@ -834,4 +843,179 @@ object CommonUtil {
 displaySize
   }
 
+  def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow,
+  fieldTypes: Seq[DataType],
+  outputArrayLength: Int): Array[AnyRef] = {
+val data = new Array[AnyRef](outputArrayLength)
+var i = 0
+val fieldTypesLen = fieldTypes.length
+while (i < fieldTypesLen) {
+  if (!row.isNullAt(i)) {
+fieldTypes(i) match {
+  case StringType =>
+data(i) = row.getString(i)
+  case d: DecimalType =>
+data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+  case arrayType : ArrayType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), 
arrayType)
+  case structType : StructType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i,
+  structType.fields.length), structType)
+  case mapType : MapType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), 
mapType)
+  case other =>
+data(i) = row.get(i, other)
+}
+  }
+  i += 1
+}
+data
+  }
+
+  /**
+   * After converting complex objects to carbon objects, need to convert to 
byte array
+   *
+   * @param row
+   * @param fields
+   * @param dataFieldsWithComplexDataType
+   * @return
+   */
+  def getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort(
+  row: InternalRow,
+  fields: Seq[StructField],
+  dataFieldsWithComplexDataType: Map[String, GenericDataType[_]]): 
Array[AnyRef] = {
+val data = new Array[AnyRef](fields.size)
+val badRecordLogHolder = new BadRecordLogHolder();
+var i = 0
+val fieldTypesLen = fields.length
+while (i < fieldTypesLen) {
+  if (!row.isNullAt(i)) {
+fields(i).dataType match {
+  case StringType =>
+data(i) = 
DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i),
+  DataTypes.STRING)
+  case d: DecimalType =>
+data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+  case arrayType : ArrayType =>
+val result = convertSparkComplexTypeToCarbonObject(row.get(i, 
arrayType), arrayType)
+// convert carbon complex object to byte array
+val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+val dataOutputStream: DataOutputStream = new 
DataOutputStream(byteArray)
+
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
+  .writeByteArray(result.asInstanceOf[ArrayObject],
+dataOutputStream,
+badRecordLogHolder)
+dataOutputStream.close()
+data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
+  case structType : StructType =>
+val result = convertSparkComplexTypeToCarbonObject(row.get(i, 
structType), structType)
+// convert carbon complex object to byte array
+val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+val dataOutputStream: DataOutputStream = new 
DataOutputStream(byteArray)
+
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[StructDataType]
+  .writeByteArray(result.asInstanceOf[StructObject],
+dataOutputStream,
+badRecordLogHolder)
+dataOutputStream.close()
+data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
+  case mapType : MapType =>
+val result = convertSparkComplexTypeToCarbonObject(row.get(i, 
mapType), mapType)
+// convert carbon complex object to byte array
+val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+val dataOutputStream: DataOutputStream = new 
DataOutputStream(byteArray)
+
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
+  .writeByteArray(result.asInstanceOf[ArrayObject],
+dataOutputStream,
+badRecordLogHolder)
+dataOutputStream.close()
+data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
 
 Review comment:
   same as above


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378771038
 
 

 ##
 File path: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 ##
 @@ -834,4 +843,179 @@ object CommonUtil {
 displaySize
   }
 
+  def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow,
+  fieldTypes: Seq[DataType],
+  outputArrayLength: Int): Array[AnyRef] = {
+val data = new Array[AnyRef](outputArrayLength)
+var i = 0
+val fieldTypesLen = fieldTypes.length
+while (i < fieldTypesLen) {
+  if (!row.isNullAt(i)) {
+fieldTypes(i) match {
+  case StringType =>
+data(i) = row.getString(i)
+  case d: DecimalType =>
+data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+  case arrayType : ArrayType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), 
arrayType)
+  case structType : StructType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i,
+  structType.fields.length), structType)
+  case mapType : MapType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), 
mapType)
+  case other =>
+data(i) = row.get(i, other)
+}
+  }
+  i += 1
+}
+data
+  }
+
+  /**
+   * After converting complex objects to carbon objects, need to convert to 
byte array
+   *
+   * @param row
+   * @param fields
+   * @param dataFieldsWithComplexDataType
+   * @return
+   */
+  def getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort(
+  row: InternalRow,
+  fields: Seq[StructField],
+  dataFieldsWithComplexDataType: Map[String, GenericDataType[_]]): 
Array[AnyRef] = {
+val data = new Array[AnyRef](fields.size)
+val badRecordLogHolder = new BadRecordLogHolder();
+var i = 0
+val fieldTypesLen = fields.length
+while (i < fieldTypesLen) {
+  if (!row.isNullAt(i)) {
+fields(i).dataType match {
+  case StringType =>
+data(i) = 
DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(row.getString(i),
+  DataTypes.STRING)
+  case d: DecimalType =>
+data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+  case arrayType : ArrayType =>
+val result = convertSparkComplexTypeToCarbonObject(row.get(i, 
arrayType), arrayType)
+// convert carbon complex object to byte array
+val byteArray: ByteArrayOutputStream = new ByteArrayOutputStream()
+val dataOutputStream: DataOutputStream = new 
DataOutputStream(byteArray)
+
dataFieldsWithComplexDataType(fields(i).name).asInstanceOf[ArrayDataType]
+  .writeByteArray(result.asInstanceOf[ArrayObject],
+dataOutputStream,
+badRecordLogHolder)
+dataOutputStream.close()
+data(i) = byteArray.toByteArray.asInstanceOf[AnyRef]
 
 Review comment:
   both are different asInstanceOf[], so method will have if, else again. not 
useful here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378754084
 
 

 ##
 File path: 
integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
 ##
 @@ -150,10 +150,14 @@ class TestLoadDataGeneral extends QueryTest with 
BeforeAndAfterEach {
 sql("drop table if exists load32000chardata")
 sql("drop table if exists load32000chardata_dup")
 sql("CREATE TABLE load32000chardata(dim1 String, dim2 String, mes1 int) 
STORED AS carbondata")
-sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 
int) STORED AS carbondata")
+sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 
int) STORED AS " +
+  "carbondata tblproperties('local_dictionary_enable'='false')")
 sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata 
OPTIONS('FILEHEADER'='dim1,dim2,mes1')")
+// Previously converter step was checking more than 32k length and 
throwing exception.
+// Now, due to local dictionary is true. Insert will not fail.
+// when local dictionary is false, insert will fail at write step
 intercept[Exception] {
-  sql("insert into load32000chardata_dup select 
dim1,concat(load32000chardata.dim2,''),mes1 from load32000chardata").show()
+  sql("insert into load32000chardata_dup select 
dim1,concat(load32000chardata.dim2,load32000chardata.dim2),mes1 from 
load32000chardata").show()
 
 Review comment:
   I set bad record handle and make it same as old test case. Else nobody else 
will understand :)
   
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-13 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378751755
 
 

 ##
 File path: 
integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
 ##
 @@ -150,10 +150,14 @@ class TestLoadDataGeneral extends QueryTest with 
BeforeAndAfterEach {
 sql("drop table if exists load32000chardata")
 sql("drop table if exists load32000chardata_dup")
 sql("CREATE TABLE load32000chardata(dim1 String, dim2 String, mes1 int) 
STORED AS carbondata")
-sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 
int) STORED AS carbondata")
+sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 
int) STORED AS " +
+  "carbondata tblproperties('local_dictionary_enable'='false')")
 sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata 
OPTIONS('FILEHEADER'='dim1,dim2,mes1')")
+// Previously converter step was checking more than 32k length and 
throwing exception.
+// Now, due to local dictionary is true. Insert will not fail.
+// when local dictionary is false, insert will fail at write step
 intercept[Exception] {
-  sql("insert into load32000chardata_dup select 
dim1,concat(load32000chardata.dim2,''),mes1 from load32000chardata").show()
+  sql("insert into load32000chardata_dup select 
dim1,concat(load32000chardata.dim2,load32000chardata.dim2),mes1 from 
load32000chardata").show()
 
 Review comment:
   In new flow no converter step doesn't check string is more than 32k or not 
now. Write step didn't fail because after compression it is less than 32k.  so 
I add 32k + 32k. so after compression it will still fail.
   
   I can set bad record handle and make it same as old test case also


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-11 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378034178
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
 ##
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.text.SimpleDateFormat
+import java.util
+
+import org.apache.spark.sql.{AnalysisException, CarbonUtils, DataFrame, 
Dataset, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, 
UpdateTableModel}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, SparkUtil}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
CarbonLoaderUtil}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+
+/*
+* insert into with df, doesn't use logical plan
+*
+*/
+case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var dataFrame: DataFrame,
+var inputSqlString: String = null,
+var updateModel: Option[UpdateTableModel] = None,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext) {
+
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  def process(sparkSession: SparkSession): Seq[Row] = {
+ThreadLocalSessionInfo
+  
.setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
+val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) 
= CommonLoadUtils
+  .processMetadataCommon(
+sparkSession,
+databaseNameOp,
+tableName,
+tableInfoOp,
+partition)
+this.sizeInBytes = sizeInBytes
+this.table = table
+this.logicalPartitionRelation = logicalPartitionRelation
+this.finalPartition = finalPartition
+val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+val hadoopConf = sparkSession.sessionState.newHadoopConf()
+carbonProperty.addProperty("zookeeper.enable.lock", "false")
+val factPath = ""
+currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+val optionsFinal: util.Map[String, String] =
+  CommonLoadUtils.getFinalLoadOptions(
+  carbonProperty, table, options)
+val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+  hadoopConf,
+  factPath,
+  optionsFinal,
+  parentTablePath,
+  table,
+  is

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-10 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r377000854
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
 ##
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.text.SimpleDateFormat
+import java.util
+
+import org.apache.spark.sql.{AnalysisException, CarbonUtils, DataFrame, 
Dataset, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, 
UpdateTableModel}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, SparkUtil}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
CarbonLoaderUtil}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+
+/*
+* insert into with df, doesn't use logical plan
+*
+*/
+case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var dataFrame: DataFrame,
+var inputSqlString: String = null,
+var updateModel: Option[UpdateTableModel] = None,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext) {
+
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  def process(sparkSession: SparkSession): Seq[Row] = {
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-10 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r377000735
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-relation: CarbonDatasourceHadoopRelation,
-child: LogicalPlan,
-overwrite: Boolean,
-partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, 
Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var logicalPlan: LogicalPlan,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var inputSqlString: String = null,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-setAuditTable(relation.carbonTable.getDatabaseName, 
relation.carbonTable.getTableName)
-val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-def containsLimit(plan: LogicalPlan): Boolean = {
-  plan find {
-case limit: GlobalLimit => true
-case other => false
-  } isDefined
+if (!tableInfoOp.isDefined) {
+  throw new RuntimeException(" table info must be present wh

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-10 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376996464
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
 ##
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.text.SimpleDateFormat
+import java.util
+
+import org.apache.spark.sql.{AnalysisException, CarbonUtils, DataFrame, 
Dataset, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, 
UpdateTableModel}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, SparkUtil}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
CarbonLoaderUtil}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+
+/*
+* insert into with df, doesn't use logical plan
+*
+*/
+case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var dataFrame: DataFrame,
+var inputSqlString: String = null,
+var updateModel: Option[UpdateTableModel] = None,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext) {
+
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  def process(sparkSession: SparkSession): Seq[Row] = {
+ThreadLocalSessionInfo
+  
.setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
+val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) 
= CommonLoadUtils
+  .processMetadataCommon(
+sparkSession,
+databaseNameOp,
+tableName,
+tableInfoOp,
+partition)
+this.sizeInBytes = sizeInBytes
+this.table = table
+this.logicalPartitionRelation = logicalPartitionRelation
+this.finalPartition = finalPartition
+val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+val hadoopConf = sparkSession.sessionState.newHadoopConf()
+carbonProperty.addProperty("zookeeper.enable.lock", "false")
+val factPath = ""
+currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+val optionsFinal: util.Map[String, String] =
+  CommonLoadUtils.getFinalLoadOptions(
+  carbonProperty, table, options)
+val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+  hadoopConf,
+  factPath,
+  optionsFinal,
+  parentTablePath,
+  table,
+  is

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-10 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376996416
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
 ##
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.text.SimpleDateFormat
+import java.util
+
+import org.apache.spark.sql.{AnalysisException, CarbonUtils, DataFrame, 
Dataset, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, 
UpdateTableModel}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, SparkUtil}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
CarbonLoaderUtil}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+
+/*
+* insert into with df, doesn't use logical plan
+*
+*/
+case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var dataFrame: DataFrame,
+var inputSqlString: String = null,
+var updateModel: Option[UpdateTableModel] = None,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext) {
+
+  var table: CarbonTable = _
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-10 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376361675
 
 

 ##
 File path: 
processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
 ##
 @@ -32,7 +33,7 @@
  * Generic DataType interface which will be used while data loading for 
complex types like Array &
  * Struct
  */
-public interface GenericDataType {
+public interface GenericDataType extends Serializable {
 
 Review comment:
   Primitive type logger was not used, removed the field itself


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376680696
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
 ##
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.text.SimpleDateFormat
+import java.util
+
+import org.apache.spark.sql.{AnalysisException, CarbonUtils, DataFrame, 
Dataset, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, 
UpdateTableModel}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, SparkUtil}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
CarbonLoaderUtil}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+
+/*
+* insert into with df, doesn't use logical plan
+*
+*/
+case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var dataFrame: DataFrame,
+var inputSqlString: String = null,
+var updateModel: Option[UpdateTableModel] = None,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext) {
+
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  def process(sparkSession: SparkSession): Seq[Row] = {
+ThreadLocalSessionInfo
+  
.setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
+val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) 
= CommonLoadUtils
+  .processMetadataCommon(
+sparkSession,
+databaseNameOp,
+tableName,
+tableInfoOp,
+partition)
+this.sizeInBytes = sizeInBytes
+this.table = table
+this.logicalPartitionRelation = logicalPartitionRelation
+this.finalPartition = finalPartition
+val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+val hadoopConf = sparkSession.sessionState.newHadoopConf()
+carbonProperty.addProperty("zookeeper.enable.lock", "false")
+val factPath = ""
+currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+val optionsFinal: util.Map[String, String] =
+  CommonLoadUtils.getFinalLoadOptions(
+  carbonProperty, table, options)
 
 Review comment:
   ok done


This is an automated message from the Apache Git Service.
To respond to the message, p

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376471953
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-relation: CarbonDatasourceHadoopRelation,
-child: LogicalPlan,
-overwrite: Boolean,
-partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, 
Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var logicalPlan: LogicalPlan,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var inputSqlString: String = null,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-setAuditTable(relation.carbonTable.getDatabaseName, 
relation.carbonTable.getTableName)
-val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-def containsLimit(plan: LogicalPlan): Boolean = {
-  plan find {
-case limit: GlobalLimit => true
-case other => false
-  } isDefined
+if (!tableInfoOp.isDefined) {
+  throw new RuntimeException(" table info must be present wh

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376361675
 
 

 ##
 File path: 
processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
 ##
 @@ -32,7 +33,7 @@
  * Generic DataType interface which will be used while data loading for 
complex types like Array &
  * Struct
  */
-public interface GenericDataType {
+public interface GenericDataType extends Serializable {
 
 Review comment:
   Primitive type logger was not user, removed the field itself


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376357834
 
 

 ##
 File path: 
processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
 ##
 @@ -110,10 +113,10 @@ private AbstractDataLoadProcessorStep 
buildInternalForNoSort(CarbonIterator[] in
*/
   private AbstractDataLoadProcessorStep buildInternalWithNoConverter(
   CarbonIterator[] inputIterators, CarbonDataLoadConfiguration 
configuration,
-  SortScopeOptions.SortScope sortScope) {
+  SortScopeOptions.SortScope sortScope, boolean withoutReArrange) {
 
 Review comment:
   That is more change, may be refactor in another PR


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376357574
 
 

 ##
 File path: 
processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
 ##
 @@ -32,7 +33,7 @@
  * Generic DataType interface which will be used while data loading for 
complex types like Array &
  * Struct
  */
-public interface GenericDataType {
+public interface GenericDataType extends Serializable {
 
 Review comment:
   ok done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376356520
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 ##
 @@ -509,30 +514,17 @@ private class CarbonOutputWriter(path: String,
 
   // TODO Implement writesupport interface to support writing Row directly to 
recordwriter
   def writeCarbon(row: InternalRow): Unit = {
-val data = new Array[AnyRef](fieldTypes.length + partitionData.length)
-var i = 0
-val fieldTypesLen = fieldTypes.length
-while (i < fieldTypesLen) {
-  if (!row.isNullAt(i)) {
-fieldTypes(i) match {
-  case StringType =>
-data(i) = row.getString(i)
-  case d: DecimalType =>
-data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
-  case other =>
-data(i) = row.get(i, other)
-}
-  }
-  i += 1
-}
+val totalLength = fieldTypes.length + partitionData.length
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376356414
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 ##
 @@ -509,30 +514,17 @@ private class CarbonOutputWriter(path: String,
 
   // TODO Implement writesupport interface to support writing Row directly to 
recordwriter
   def writeCarbon(row: InternalRow): Unit = {
-val data = new Array[AnyRef](fieldTypes.length + partitionData.length)
-var i = 0
-val fieldTypesLen = fieldTypes.length
-while (i < fieldTypesLen) {
-  if (!row.isNullAt(i)) {
-fieldTypes(i) match {
-  case StringType =>
-data(i) = row.getString(i)
-  case d: DecimalType =>
-data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
-  case other =>
-data(i) = row.get(i, other)
-}
-  }
-  i += 1
-}
+val totalLength = fieldTypes.length + partitionData.length
+val data: Array[AnyRef] = 
CommonUtil.getObjectArrayFromInternalRowAndConvertComplexType(row,
+  fieldTypes,
+  totalLength)
 if (partitionData.length > 0) {
-  System.arraycopy(partitionData, 0, data, fieldTypesLen, 
partitionData.length)
+  System.arraycopy(partitionData, 0, data, fieldTypes.length, 
partitionData.length)
 
 Review comment:
   yes


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376355932
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
 ##
 @@ -76,11 +76,16 @@ case class CarbonCreateTableAsSelectCommand(
 .createCarbonDataSourceHadoopRelation(sparkSession,
   TableIdentifier(tableName, Option(dbName)))
   // execute command to load data into carbon table
-  loadCommand = CarbonInsertIntoCommand(
-carbonDataSourceHadoopRelation,
-query,
-overwrite = false,
-partition = Map.empty)
+  loadCommand = CarbonInsertIntoCommand(Some(carbonDataSourceHadoopRelation
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376355884
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
 ##
 @@ -316,16 +315,15 @@ private[sql] case class CarbonProjectForUpdateCommand(
 
 val header = getHeader(carbonRelation, plan)
 
-CarbonLoadDataCommand(
+CarbonInsertIntoWithDf(
   Some(carbonRelation.identifier.getCarbonTableIdentifier.getDatabaseName),
   carbonRelation.identifier.getCarbonTableIdentifier.getTableName,
-  null,
-  Seq(),
   Map(("fileheader" -> header)),
   false,
   null,
-  Some(dataFrame),
-  Some(updateTableModel)).run(sparkSession)
+  dataFrame,
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376354706
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadParams.scala
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.mutable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.command.UpdateTableModel
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.statusmanager.SegmentStatus
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+/*
+* intermediate object to pass between load functions
+*/
+case class CarbonLoadParams(sparkSession: SparkSession,
 
 Review comment:
   load model will become bulky and it is used in other places also, this is 
only for this place


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376351172
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-relation: CarbonDatasourceHadoopRelation,
-child: LogicalPlan,
-overwrite: Boolean,
-partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, 
Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var logicalPlan: LogicalPlan,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var inputSqlString: String = null,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-setAuditTable(relation.carbonTable.getDatabaseName, 
relation.carbonTable.getTableName)
-val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-def containsLimit(plan: LogicalPlan): Boolean = {
-  plan find {
-case limit: GlobalLimit => true
-case other => false
-  } isDefined
+if (!tableInfoOp.isDefined) {
+  throw new RuntimeException(" table info must be present wh

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376351144
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-relation: CarbonDatasourceHadoopRelation,
-child: LogicalPlan,
-overwrite: Boolean,
-partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, 
Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var logicalPlan: LogicalPlan,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var inputSqlString: String = null,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-setAuditTable(relation.carbonTable.getDatabaseName, 
relation.carbonTable.getTableName)
-val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-def containsLimit(plan: LogicalPlan): Boolean = {
-  plan find {
-case limit: GlobalLimit => true
-case other => false
-  } isDefined
+if (!tableInfoOp.isDefined) {
+  throw new RuntimeException(" table info must be present wh

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376349852
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-relation: CarbonDatasourceHadoopRelation,
-child: LogicalPlan,
-overwrite: Boolean,
-partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, 
Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var logicalPlan: LogicalPlan,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var inputSqlString: String = null,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-setAuditTable(relation.carbonTable.getDatabaseName, 
relation.carbonTable.getTableName)
-val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-def containsLimit(plan: LogicalPlan): Boolean = {
-  plan find {
-case limit: GlobalLimit => true
-case other => false
-  } isDefined
+if (!tableInfoOp.isDefined) {
+  throw new RuntimeException(" table info must be present wh

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376349173
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-relation: CarbonDatasourceHadoopRelation,
-child: LogicalPlan,
-overwrite: Boolean,
-partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, 
Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var logicalPlan: LogicalPlan,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var inputSqlString: String = null,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-setAuditTable(relation.carbonTable.getDatabaseName, 
relation.carbonTable.getTableName)
-val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-def containsLimit(plan: LogicalPlan): Boolean = {
-  plan find {
-case limit: GlobalLimit => true
-case other => false
-  } isDefined
+if (!tableInfoOp.isDefined) {
+  throw new RuntimeException(" table info must be present wh

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376348551
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-relation: CarbonDatasourceHadoopRelation,
-child: LogicalPlan,
-overwrite: Boolean,
-partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, 
Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var logicalPlan: LogicalPlan,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var inputSqlString: String = null,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-setAuditTable(relation.carbonTable.getDatabaseName, 
relation.carbonTable.getTableName)
-val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-def containsLimit(plan: LogicalPlan): Boolean = {
-  plan find {
-case limit: GlobalLimit => true
-case other => false
-  } isDefined
+if (!tableInfoOp.isDefined) {
+  throw new RuntimeException(" table info must be present wh

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376347781
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-relation: CarbonDatasourceHadoopRelation,
-child: LogicalPlan,
-overwrite: Boolean,
-partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, 
Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var logicalPlan: LogicalPlan,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var inputSqlString: String = null,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-setAuditTable(relation.carbonTable.getDatabaseName, 
relation.carbonTable.getTableName)
-val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-def containsLimit(plan: LogicalPlan): Boolean = {
-  plan find {
-case limit: GlobalLimit => true
-case other => false
-  } isDefined
+if (!tableInfoOp.isDefined) {
+  throw new RuntimeException(" table info must be present wh

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376346935
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 ##
 @@ -17,85 +17,518 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, Dataset, Row, 
SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
-import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataCommand}
-import org.apache.spark.storage.StorageLevel
-
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-
-case class CarbonInsertIntoCommand(
-relation: CarbonDatasourceHadoopRelation,
-child: LogicalPlan,
-overwrite: Boolean,
-partition: Map[String, Option[String]])
+import java.text.SimpleDateFormat
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Dataset, 
Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, 
DataLoadTableFileMapping}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.CausedBy
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo, TableSchema}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/*
+* insert into without df, by just using logical plan
+*
+*/
+case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var logicalPlan: LogicalPlan,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var inputSqlString: String = null,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext)
   extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonLoadDataCommand = _
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var scanResultRdd: RDD[InternalRow] = _
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var isInsertIntoWithConverterFlow: Boolean = false
+
+  var dataFrame: DataFrame = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-setAuditTable(relation.carbonTable.getDatabaseName, 
relation.carbonTable.getTableName)
-val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-def containsLimit(plan: LogicalPlan): Boolean = {
-  plan find {
-case limit: GlobalLimit => true
-case other => false
-  } isDefined
+if (!tableInfoOp.isDefined) {
+  throw new RuntimeException(" table info must be present wh

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376343994
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##
 @@ -957,24 +987,36 @@ object CarbonDataRDDFactory {
   private def loadDataFrame(
   sqlContext: SQLContext,
   dataFrame: Option[DataFrame],
+  scanResultRDD: Option[RDD[InternalRow]],
   carbonLoadModel: CarbonLoadModel
   ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
 try {
-  val rdd = dataFrame.get.rdd
-
+  val rdd = if (dataFrame.isDefined) {
+dataFrame.get.rdd
+  } else {
+scanResultRDD.get
+  }
   val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
 DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
   }.distinct.length
   val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
 nodeNumOfData,
 sqlContext.sparkContext)
-  val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, 
nodes.toArray
-.distinct)
-
+  val newRdd =
+if (dataFrame.isDefined) {
+  new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, 
dataFrame.get.rdd, nodes.toArray
+.distinct)
+} else {
+  new DataLoadCoalescedRDD[InternalRow](sqlContext.sparkSession,
+scanResultRDD.get,
+nodes.toArray
+  .distinct)
 
 Review comment:
   yes, this flow is for local sort and no sort. Have plan to send no sort to 
global sort plan.
   
   local sort, one task one node. Hence this logic


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376342021
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##
 @@ -957,24 +987,36 @@ object CarbonDataRDDFactory {
   private def loadDataFrame(
   sqlContext: SQLContext,
   dataFrame: Option[DataFrame],
+  scanResultRDD: Option[RDD[InternalRow]],
   carbonLoadModel: CarbonLoadModel
   ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
 try {
-  val rdd = dataFrame.get.rdd
-
+  val rdd = if (dataFrame.isDefined) {
+dataFrame.get.rdd
+  } else {
+scanResultRDD.get
+  }
   val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
 DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
   }.distinct.length
   val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
 nodeNumOfData,
 sqlContext.sparkContext)
-  val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, 
nodes.toArray
-.distinct)
-
+  val newRdd =
+if (dataFrame.isDefined) {
+  new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, 
dataFrame.get.rdd, nodes.toArray
+.distinct)
+} else {
+  new DataLoadCoalescedRDD[InternalRow](sqlContext.sparkSession,
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376341490
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##
 @@ -957,24 +987,36 @@ object CarbonDataRDDFactory {
   private def loadDataFrame(
   sqlContext: SQLContext,
   dataFrame: Option[DataFrame],
+  scanResultRDD: Option[RDD[InternalRow]],
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376339989
 
 

 ##
 File path: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ##
 @@ -179,6 +182,93 @@ object DataLoadProcessBuilderOnSpark {
 updateLoadStatus(model, partialSuccessAccum)
   }
 
+  def insertDataUsingGlobalSortWithInternalRow(sparkSession: SparkSession,
+  scanResultRDD : RDD[InternalRow],
+  model: CarbonLoadModel,
+  hadoopConf: Configuration): Array[(String, (LoadMetadataDetails, 
ExecutionErrors))] = {
+val originRDD = scanResultRDD
+
+val sc = sparkSession.sparkContext
+val modelBroadcast = sc.broadcast(model)
+val partialSuccessAccum = sc.longAccumulator("Partial Success Accumulator")
+val sortStepRowCounter = sc.longAccumulator("Sort Processor Accumulator")
+val writeStepRowCounter = sc.longAccumulator("Write Processor Accumulator")
+
+hadoopConf
+  .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, 
sparkSession.sparkContext.appName)
+
+val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
+val configuration = DataLoadProcessBuilder.createConfiguration(model)
+
+// 1. Convert internalRow to object array
+val fields = Util
+  
.convertToSparkSchemaFromColumnSchema(model.getCarbonDataLoadSchema.getCarbonTable,
 true)
+  .fields
+  .toSeq
+
+val dataTypes = fields.map(field => field.dataType)
+val map: mutable.Map[String, GenericDataType[_]] = mutable.Map[String, 
GenericDataType[_]]()
+CommonUtil.convertComplexDataType(map, configuration)
+val rdd = originRDD.map { internalRow =>
+  
CommonUtil.getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort(internalRow,
+fields,
+map)
+}
+
+// 2. sort
+var numPartitions = CarbonDataProcessorUtil.getGlobalSortPartitions(
+  
configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS))
+if (numPartitions <= 0) {
+  numPartitions = originRDD.partitions.length
+}
+// Because if the number of partitions greater than 1, there will be 
action operator
+// (sample) in
+// sortBy operator. So here we cache the rdd to avoid do input and convert 
again.
+if (numPartitions > 1) {
+  rdd.persist(StorageLevel.fromString(
+CarbonProperties.getInstance().getGlobalSortRddStorageLevel()))
+}
+val sortColumnsLength = 
model.getCarbonDataLoadSchema.getCarbonTable.getSortColumns.size()
+val sortColumnDataTypes = dataTypes.take(sortColumnsLength)
+val rowComparator = 
GlobalSortHelper.generateRowComparator(sortColumnDataTypes)
+val sortRDD = rdd.sortBy(x => getKey(x, sortColumnsLength, 
sortColumnDataTypes),
 
 Review comment:
   ok


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376340053
 
 

 ##
 File path: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
 ##
 @@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.text.SimpleDateFormat
+import java.util
+
+import org.apache.spark.sql.{AnalysisException, CarbonUtils, DataFrame, 
Dataset, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, 
UpdateTableModel}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, SparkUtil}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.OperationContext
+import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
CarbonLoaderUtil}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+
+/*
+* insert into with df, doesn't use logical plan
+*
+*/
+case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
+tableName: String,
+options: Map[String, String],
+isOverwriteTable: Boolean,
+var dimFilesPath: Seq[DataLoadTableFileMapping] = Seq(),
+var dataFrame: DataFrame,
+var inputSqlString: String = null,
+var updateModel: Option[UpdateTableModel] = None,
+var tableInfoOp: Option[TableInfo] = None,
+var internalOptions: Map[String, String] = Map.empty,
+var partition: Map[String, Option[String]] = Map.empty,
+var operationContext: OperationContext = new OperationContext) {
+
+  var table: CarbonTable = _
+
+  var logicalPartitionRelation: LogicalRelation = _
+
+  var sizeInBytes: Long = _
+
+  var currPartitions: util.List[PartitionSpec] = _
+
+  var parentTablePath: String = _
+
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  var finalPartition: Map[String, Option[String]] = Map.empty
+
+  var timeStampFormat: SimpleDateFormat = _
+
+  var dateFormat: SimpleDateFormat = _
+
+  def process(sparkSession: SparkSession): Seq[Row] = {
+ThreadLocalSessionInfo
+  
.setConfigurationToCurrentThread(sparkSession.sessionState.newHadoopConf())
+val (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition) 
= CommonLoadUtils
+  .processMetadataCommon(
+sparkSession,
+databaseNameOp,
+tableName,
+tableInfoOp,
+partition)
+this.sizeInBytes = sizeInBytes
+this.table = table
+this.logicalPartitionRelation = logicalPartitionRelation
+this.finalPartition = finalPartition
+val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+val hadoopConf = sparkSession.sessionState.newHadoopConf()
+carbonProperty.addProperty("zookeeper.enable.lock", "false")
+val factPath = ""
+currPartitions = CommonLoadUtils.getCurrentParitions(sparkSession, table)
+CommonLoadUtils.setNumberOfCoresWhileLoading(sparkSession, carbonProperty)
+val optionsFinal: util.Map[String, String] =
+  CommonLoadUtils.getFinalLoadOptions(
+  carbonProperty, table, options)
+val carbonLoadModel: CarbonLoadModel = CommonLoadUtils.prepareLoadModel(
+  hadoopConf,
+  factPath,
+  optionsFinal,
+  parentTablePath,
+  table,
+  is

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376339644
 
 

 ##
 File path: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 ##
 @@ -834,4 +843,179 @@ object CommonUtil {
 displaySize
   }
 
+  def getObjectArrayFromInternalRowAndConvertComplexType(row: InternalRow,
+  fieldTypes: Seq[DataType],
+  outputArrayLength: Int): Array[AnyRef] = {
+val data = new Array[AnyRef](outputArrayLength)
+var i = 0
+val fieldTypesLen = fieldTypes.length
+while (i < fieldTypesLen) {
+  if (!row.isNullAt(i)) {
+fieldTypes(i) match {
+  case StringType =>
+data(i) = row.getString(i)
+  case d: DecimalType =>
+data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+  case arrayType : ArrayType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getArray(i), 
arrayType)
+  case structType : StructType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getStruct(i,
+  structType.fields.length), structType)
+  case mapType : MapType =>
+data(i) = convertSparkComplexTypeToCarbonObject(row.getMap(i), 
mapType)
+  case other =>
+data(i) = row.get(i, other)
+}
+  }
+  i += 1
+}
+data
+  }
+
+  /**
+   * After converting complex objects to carbon objects, need to convert to 
byte array
+   *
+   * @param row
+   * @param fields
+   * @param dataFieldsWithComplexDataType
+   * @return
+   */
+  def getObjectArrayFromInternalRowAndConvertComplexTypeForGlobalSort(
+  row: InternalRow,
+  fields: Seq[StructField],
+  dataFieldsWithComplexDataType: Map[String, GenericDataType[_]]): 
Array[AnyRef] = {
 
 Review comment:
   No, not always. Primitive measure type will be object. This converting 
internal row to object array. Only in the write step. It will be stored as byte 
array


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376338167
 
 

 ##
 File path: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 ##
 @@ -179,6 +182,93 @@ object DataLoadProcessBuilderOnSpark {
 updateLoadStatus(model, partialSuccessAccum)
   }
 
+  def insertDataUsingGlobalSortWithInternalRow(sparkSession: SparkSession,
 
 Review comment:
   ok, I follow this guideline. Auto refactor will keep in the same line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376337691
 
 

 ##
 File path: 
integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java
 ##
 @@ -76,9 +76,21 @@ public static boolean 
isBlockWithoutBlockletInfoExists(List sp
 } else if 
(org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType))
 {
   return DataTypes.createDecimalType();
 } else if (carbonDataType == 
org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
+  if (forGlobalSort) {
 
 Review comment:
   It is required, In MeasureFieldConverterImpl, we convert it to long value in 
getNoDictionaryValueBasedOnDataType method


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] Optimize insert into flow

2020-02-07 Thread GitBox
ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r376334978
 
 

 ##
 File path: 
processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
 ##
 @@ -334,6 +356,42 @@ private CarbonRowBatch getBatch() {
   return newData;
 }
 
+private Object[] convertToNoDictionaryToBytesWithoutReArrange(Object[] 
data,
 
 Review comment:
   I checked , This is different. In original method when ordered data is 
present it is based on no dictionary mapping, I don't want to use no dictionary 
mapping.
   complex type logic is same, may be I will extract a common method for that 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services