[CARBONDATA-2093] Use small file feature of global sort to minimise the carbondata file count
This closes #1876 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e527c059 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e527c059 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e527c059 Branch: refs/heads/branch-1.3 Commit: e527c059e81e58503d568c82a3e7ac822a8a5b47 Parents: 8875775 Author: ravipesala <ravi.pes...@gmail.com> Authored: Sun Jan 28 20:37:21 2018 +0530 Committer: QiangCai <qiang...@qq.com> Committed: Sat Feb 3 16:36:30 2018 +0800 ---------------------------------------------------------------------- .../StandardPartitionTableLoadingTestCase.scala | 77 ++++++++++- .../load/DataLoadProcessBuilderOnSpark.scala | 130 +------------------ .../carbondata/spark/util/DataLoadingUtil.scala | 127 ++++++++++++++++++ .../management/CarbonLoadDataCommand.scala | 94 ++++++-------- .../sort/sortdata/SortParameters.java | 4 + 5 files changed, 249 insertions(+), 183 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index 16f252b..669d6e7 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -16,11 +16,12 @@ */ package org.apache.carbondata.spark.testsuite.standardpartition -import java.io.{File, IOException} +import java.io.{File, FileWriter, IOException} import java.util import java.util.concurrent.{Callable, ExecutorService, Executors} import org.apache.commons.io.FileUtils +import org.apache.spark.sql.execution.BatchedDataSourceScanExec import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row} import org.scalatest.BeforeAndAfterAll @@ -30,7 +31,8 @@ import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.spark.rdd.CarbonScanRDD class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfterAll { var executorService: ExecutorService = _ @@ -409,6 +411,75 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte sql("select * from casesensitivepartition where empno=17")) } + test("Partition LOAD with small files") { + sql("DROP TABLE IF EXISTS smallpartitionfiles") + sql( + """ + | CREATE TABLE smallpartitionfiles(id INT, name STRING, age INT) PARTITIONED BY(city STRING) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + val inputPath = new File("target/small_files").getCanonicalPath + val folder = new File(inputPath) + if (folder.exists()) { + FileUtils.deleteDirectory(folder) + } + folder.mkdir() + for (i <- 0 to 100) { + val file = s"$folder/file$i.csv" + val writer = new FileWriter(file) + writer.write("id,name,city,age\n") + writer.write(s"$i,name_$i,city_${i % 5},${ i % 100 }") + writer.close() + } + sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfiles") + FileUtils.deleteDirectory(folder) + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "smallpartitionfiles") + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) + val segmentDir = carbonTablePath.getSegmentDir("0", "0") + assert(new File(segmentDir).listFiles().length < 50) + } + + test("verify partition read with small files") { + try { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES) + sql("DROP TABLE IF EXISTS smallpartitionfilesread") + sql( + """ + | CREATE TABLE smallpartitionfilesread(id INT, name STRING, age INT) PARTITIONED BY + | (city STRING) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + val inputPath = new File("target/small_files").getCanonicalPath + val folder = new File(inputPath) + if (folder.exists()) { + FileUtils.deleteDirectory(folder) + } + folder.mkdir() + for (i <- 0 until 100) { + val file = s"$folder/file$i.csv" + val writer = new FileWriter(file) + writer.write("id,name,city,age\n") + writer.write(s"$i,name_$i,city_${ i },${ i % 100 }") + writer.close() + } + sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfilesread") + FileUtils.deleteDirectory(folder) + val dataFrame = sql("select * from smallpartitionfilesread") + val scanRdd = dataFrame.queryExecution.sparkPlan.collect { + case b: BatchedDataSourceScanExec if b.rdd.isInstanceOf[CarbonScanRDD] => b.rdd + .asInstanceOf[CarbonScanRDD] + }.head + assert(scanRdd.getPartitions.length < 10) + assertResult(100)(dataFrame.count) + } finally { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION , + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT) + } + } + + + def restoreData(dblocation: String, tableName: String) = { val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName val source = dblocation+ "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName @@ -435,6 +506,8 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte override def afterAll = { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION , + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT) dropTable if (executorService != null && !executorService.isShutdown) { executorService.shutdownNow() http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index 781b484..8be70a9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -17,26 +17,12 @@ package org.apache.carbondata.spark.load -import java.text.SimpleDateFormat -import java.util.{Comparator, Date, Locale} - -import scala.collection.mutable.ArrayBuffer +import java.util.Comparator import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType} -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} -import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl} import org.apache.spark.TaskContext -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.command.ExecutionErrors -import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} -import org.apache.spark.sql.util.SparkSQLUtil.sessionState import org.apache.spark.storage.StorageLevel import org.apache.carbondata.common.logging.LogServiceFactory @@ -45,12 +31,10 @@ import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses} -import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} import org.apache.carbondata.processing.util.CarbonDataProcessorUtil -import org.apache.carbondata.spark.rdd.SerializableConfiguration -import org.apache.carbondata.spark.util.CommonUtil +import org.apache.carbondata.spark.util.DataLoadingUtil /** * Use sortBy operator in spark to load the data @@ -68,7 +52,7 @@ object DataLoadProcessBuilderOnSpark { } else { // input data from files val columnCount = model.getCsvHeaderColumns.length - csvFileScanRDD(sparkSession, model, hadoopConf) + DataLoadingUtil.csvFileScanRDD(sparkSession, model, hadoopConf) .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) } @@ -166,112 +150,4 @@ object DataLoadProcessBuilderOnSpark { Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors))) } } - - /** - * creates a RDD that does reading of multiple CSV files - */ - def csvFileScanRDD( - spark: SparkSession, - model: CarbonLoadModel, - hadoopConf: Configuration - ): RDD[InternalRow] = { - // 1. partition - val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes - val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes - val defaultParallelism = spark.sparkContext.defaultParallelism - CommonUtil.configureCSVInputFormat(hadoopConf, model) - hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath) - val jobConf = new JobConf(hadoopConf) - SparkHadoopUtil.get.addCredentials(jobConf) - val jobContext = new JobContextImpl(jobConf, null) - val inputFormat = new CSVInputFormat() - val rawSplits = inputFormat.getSplits(jobContext).toArray - val splitFiles = rawSplits.map { split => - val fileSplit = split.asInstanceOf[FileSplit] - PartitionedFile( - InternalRow.empty, - fileSplit.getPath.toString, - fileSplit.getStart, - fileSplit.getLength, - fileSplit.getLocations) - }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) - val totalBytes = splitFiles.map(_.length + openCostInBytes).sum - val bytesPerCore = totalBytes / defaultParallelism - - val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) - LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + - s"open cost is considered as scanning $openCostInBytes bytes.") - - val partitions = new ArrayBuffer[FilePartition] - val currentFiles = new ArrayBuffer[PartitionedFile] - var currentSize = 0L - - def closePartition(): Unit = { - if (currentFiles.nonEmpty) { - val newPartition = - FilePartition( - partitions.size, - currentFiles.toArray.toSeq) - partitions += newPartition - } - currentFiles.clear() - currentSize = 0 - } - - splitFiles.foreach { file => - if (currentSize + file.length > maxSplitBytes) { - closePartition() - } - // Add the given file to the current partition. - currentSize += file.length + openCostInBytes - currentFiles += file - } - closePartition() - - // 2. read function - val serializableConfiguration = new SerializableConfiguration(jobConf) - val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable { - override def apply(file: PartitionedFile): Iterator[InternalRow] = { - new Iterator[InternalRow] { - val hadoopConf = serializableConfiguration.value - val jobTrackerId: String = { - val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) - formatter.format(new Date()) - } - val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0) - val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) - val inputSplit = - new FileSplit(new Path(file.filePath), file.start, file.length, file.locations) - var finished = false - val inputFormat = new CSVInputFormat() - val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext) - reader.initialize(inputSplit, hadoopAttemptContext) - - override def hasNext: Boolean = { - if (!finished) { - if (reader != null) { - if (reader.nextKeyValue()) { - true - } else { - finished = true - reader.close() - false - } - } else { - finished = true - false - } - } else { - false - } - } - - override def next(): InternalRow = { - new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]]) - } - } - } - } - new FileScanRDD(spark, readFunction, partitions) - } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala index 5e9f7fe..8b4c232 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala @@ -17,12 +17,28 @@ package org.apache.carbondata.spark.util +import java.text.SimpleDateFormat +import java.util.{Date, Locale} + import scala.collection.{immutable, mutable} import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} +import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} import org.apache.spark.sql.util.CarbonException +import org.apache.spark.sql.util.SparkSQLUtil.sessionState import org.apache.carbondata.common.constants.LoggerAction import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} @@ -32,10 +48,13 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, TableOptionConstant} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark.LOGGER import org.apache.carbondata.spark.load.ValidateUtil +import org.apache.carbondata.spark.rdd.SerializableConfiguration /** * the util object of data loading @@ -403,4 +422,112 @@ object DataLoadingUtil { } } + /** + * creates a RDD that does reading of multiple CSV files + */ + def csvFileScanRDD( + spark: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration + ): RDD[InternalRow] = { + // 1. partition + val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes + val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes + val defaultParallelism = spark.sparkContext.defaultParallelism + CommonUtil.configureCSVInputFormat(hadoopConf, model) + hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath) + val jobConf = new JobConf(hadoopConf) + SparkHadoopUtil.get.addCredentials(jobConf) + val jobContext = new JobContextImpl(jobConf, null) + val inputFormat = new CSVInputFormat() + val rawSplits = inputFormat.getSplits(jobContext).toArray + val splitFiles = rawSplits.map { split => + val fileSplit = split.asInstanceOf[FileSplit] + PartitionedFile( + InternalRow.empty, + fileSplit.getPath.toString, + fileSplit.getStart, + fileSplit.getLength, + fileSplit.getLocations) + }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + val totalBytes = splitFiles.map(_.length + openCostInBytes).sum + val bytesPerCore = totalBytes / defaultParallelism + + val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"open cost is considered as scanning $openCostInBytes bytes.") + + val partitions = new ArrayBuffer[FilePartition] + val currentFiles = new ArrayBuffer[PartitionedFile] + var currentSize = 0L + + def closePartition(): Unit = { + if (currentFiles.nonEmpty) { + val newPartition = + FilePartition( + partitions.size, + currentFiles.toArray.toSeq) + partitions += newPartition + } + currentFiles.clear() + currentSize = 0 + } + + splitFiles.foreach { file => + if (currentSize + file.length > maxSplitBytes) { + closePartition() + } + // Add the given file to the current partition. + currentSize += file.length + openCostInBytes + currentFiles += file + } + closePartition() + + // 2. read function + val serializableConfiguration = new SerializableConfiguration(jobConf) + val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable { + override def apply(file: PartitionedFile): Iterator[InternalRow] = { + new Iterator[InternalRow] { + val hadoopConf = serializableConfiguration.value + val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) + formatter.format(new Date()) + } + val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) + val inputSplit = + new FileSplit(new Path(file.filePath), file.start, file.length, file.locations) + var finished = false + val inputFormat = new CSVInputFormat() + val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext) + reader.initialize(inputSplit, hadoopAttemptContext) + + override def hasNext: Boolean = { + if (!finished) { + if (reader != null) { + if (reader.nextKeyValue()) { + true + } else { + finished = true + reader.close() + false + } + } else { + finished = true + false + } + } else { + false + } + } + + override def next(): InternalRow = { + new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]]) + } + } + } + } + new FileScanRDD(spark, readFunction, partitions) + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 8e6c20e..7d49c11 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -26,22 +26,18 @@ import scala.collection.mutable import org.apache.commons.lang3.StringUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.NullWritable -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.{NewHadoopRDD, RDD} +import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GenericInternalRow} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel} -import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -60,22 +56,21 @@ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.core.util.path.{CarbonStorePath} +import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException import org.apache.carbondata.hadoop.util.ObjectSerializationUtil import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.TableProcessingOperations -import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable} import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} -import org.apache.carbondata.processing.loading.exception.{NoRetryException} -import org.apache.carbondata.processing.loading.model.{CarbonLoadModel} +import org.apache.carbondata.processing.loading.exception.NoRetryException +import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD} -import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil} case class CarbonLoadDataCommand( databaseNameOp: Option[String], @@ -95,6 +90,10 @@ case class CarbonLoadDataCommand( var table: CarbonTable = _ + var logicalPartitionRelation: LogicalRelation = _ + + var sizeInBytes: Long = _ + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) @@ -113,6 +112,15 @@ case class CarbonLoadDataCommand( } relation.carbonTable } + if (table.isHivePartitionTable) { + logicalPartitionRelation = + new FindDataSourceTable(sparkSession).apply( + sparkSession.sessionState.catalog.lookupRelation( + TableIdentifier(tableName, databaseNameOp))).collect { + case l: LogicalRelation => l + }.head + sizeInBytes = logicalPartitionRelation.relation.sizeInBytes + } operationContext.setProperty("isOverwrite", isOverwriteTable) if(CarbonUtil.hasAggregationDataMap(table)) { val loadMetadataEvent = new LoadMetadataEvent(table, false) @@ -500,20 +508,7 @@ case class CarbonLoadDataCommand( operationContext: OperationContext) = { val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName)) - val logicalPlan = - sparkSession.sessionState.catalog.lookupRelation( - identifier) - val catalogTable: CatalogTable = logicalPlan.collect { - case l: LogicalRelation => l.catalogTable.get - case c // To make compatabile with spark 2.1 and 2.2 we need to compare classes - if c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") || - c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") || - c.getClass.getName.equals( - "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") => - CarbonReflectionUtils.getFieldOfCatalogTable( - "tableMeta", - c).asInstanceOf[CatalogTable] - }.head + val catalogTable: CatalogTable = logicalPartitionRelation.catalogTable.get val currentPartitions = CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession, identifier) // Clean up the alreday dropped partitioned data @@ -581,10 +576,6 @@ case class CarbonLoadDataCommand( } else { // input data from csv files. Convert to logical plan - CommonUtil.configureCSVInputFormat(hadoopConf, carbonLoadModel) - hadoopConf.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath) - val jobConf = new JobConf(hadoopConf) - SparkHadoopUtil.get.addCredentials(jobConf) val attributes = StructType(carbonLoadModel.getCsvHeaderColumns.map( StructField(_, StringType))).toAttributes @@ -603,28 +594,27 @@ case class CarbonLoadDataCommand( } val len = rowDataTypes.length var rdd = - new NewHadoopRDD[NullWritable, StringArrayWritable]( - sparkSession.sparkContext, - classOf[CSVInputFormat], - classOf[NullWritable], - classOf[StringArrayWritable], - jobConf).map { case (key, value) => - val data = new Array[Any](len) - var i = 0 - val input = value.get() - val inputLen = Math.min(input.length, len) - while (i < inputLen) { - data(i) = UTF8String.fromString(input(i)) - // If partition column then update empty value with special string otherwise spark - // makes it as null so we cannot internally handle badrecords. - if (partitionColumns(i)) { - if (input(i) != null && input(i).isEmpty) { - data(i) = UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL) + DataLoadingUtil.csvFileScanRDD( + sparkSession, + model = carbonLoadModel, + hadoopConf) + .map { row => + val data = new Array[Any](len) + var i = 0 + val input = row.asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[String]] + val inputLen = Math.min(input.length, len) + while (i < inputLen) { + data(i) = UTF8String.fromString(input(i)) + // If partition column then update empty value with special string otherwise spark + // makes it as null so we cannot internally handle badrecords. + if (partitionColumns(i)) { + if (input(i) != null && input(i).isEmpty) { + data(i) = UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL) + } } + i = i + 1 } - i = i + 1 - } - InternalRow.fromSeq(data) + InternalRow.fromSeq(data) } // Only select the required columns @@ -638,10 +628,6 @@ case class CarbonLoadDataCommand( } Project(output, LogicalRDD(attributes, rdd)(sparkSession)) } - // TODO need to find a way to avoid double lookup - val sizeInBytes = - CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation( - catalogTable.identifier)(sparkSession).asInstanceOf[CarbonRelation].sizeInBytes val convertRelation = convertToLogicalRelation( catalogTable, sizeInBytes, http://git-wip-us.apache.org/repos/asf/carbondata/blob/e527c059/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java index a2248ee..98d150e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java @@ -403,6 +403,10 @@ public class SortParameters implements Serializable { LOGGER.info("temp file location: " + StringUtils.join(parameters.getTempFileLocation(), ",")); int numberOfCores = carbonProperties.getNumberOfCores() / 2; + // In case of loading from partition we should use the cores specified by it + if (configuration.getWritingCoresCount() > 0) { + numberOfCores = configuration.getWritingCoresCount(); + } parameters.setNumberOfCores(numberOfCores > 0 ? numberOfCores : 1); parameters.setFileWriteBufferSize(Integer.parseInt(carbonProperties