http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala deleted file mode 100644 index 9c5ab69..0000000 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala +++ /dev/null @@ -1,451 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.util - -import scala.collection.{immutable, mutable} -import scala.collection.JavaConverters._ - -import org.apache.commons.lang3.StringUtils -import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.util.CarbonException - -import org.apache.carbondata.common.constants.LoggerAction -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} -import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} -import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants -import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} -import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, TableOptionConstant} -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.load.ValidateUtil - -/** - * the util object of data loading - */ -object DataLoadingUtil { - - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - /** - * get data loading options and initialise default value - */ - def getDataLoadingOptions( - carbonProperty: CarbonProperties, - options: immutable.Map[String, String]): mutable.Map[String, String] = { - val optionsFinal = scala.collection.mutable.Map[String, String]() - optionsFinal.put("delimiter", options.getOrElse("delimiter", ",")) - optionsFinal.put("quotechar", options.getOrElse("quotechar", "\"")) - optionsFinal.put("fileheader", options.getOrElse("fileheader", "")) - optionsFinal.put("commentchar", options.getOrElse("commentchar", "#")) - optionsFinal.put("columndict", options.getOrElse("columndict", null)) - - optionsFinal.put("escapechar", - CarbonLoaderUtil.getEscapeChar(options.getOrElse("escapechar", "\\"))) - - optionsFinal.put( - "serialization_null_format", - options.getOrElse("serialization_null_format", "\\N")) - - optionsFinal.put( - "bad_records_logger_enable", - options.getOrElse( - "bad_records_logger_enable", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))) - - val badRecordActionValue = carbonProperty.getProperty( - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT) - - optionsFinal.put( - "bad_records_action", - options.getOrElse( - "bad_records_action", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, - badRecordActionValue))) - - optionsFinal.put( - "is_empty_data_bad_record", - options.getOrElse( - "is_empty_data_bad_record", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, - CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))) - - optionsFinal.put( - "skip_empty_line", - options.getOrElse( - "skip_empty_line", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE))) - - optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", "")) - - optionsFinal.put( - "complex_delimiter_level_1", - options.getOrElse("complex_delimiter_level_1", "\\$")) - - optionsFinal.put( - "complex_delimiter_level_2", - options.getOrElse("complex_delimiter_level_2", "\\:")) - - optionsFinal.put( - "dateformat", - options.getOrElse( - "dateformat", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, - CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT))) - - optionsFinal.put( - "timestampformat", - options.getOrElse( - "timestampformat", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT, - CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT))) - - optionsFinal.put( - "global_sort_partitions", - options.getOrElse( - "global_sort_partitions", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, - null))) - - optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null)) - - optionsFinal.put( - "batch_sort_size_inmb", - options.getOrElse( - "batch_sort_size_inmb", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, - carbonProperty.getProperty( - CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, - CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))) - - optionsFinal.put( - "bad_record_path", - options.getOrElse( - "bad_record_path", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, - carbonProperty.getProperty( - CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)))) - - val useOnePass = options.getOrElse( - "single_pass", - carbonProperty.getProperty( - CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, - CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match { - case "true" => - true - case "false" => - // when single_pass = false and if either alldictionarypath - // or columnDict is configured the do not allow load - if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) || - StringUtils.isNotEmpty(optionsFinal("columndict"))) { - throw new MalformedCarbonCommandException( - "Can not use all_dictionary_path or columndict without single_pass.") - } else { - false - } - case illegal => - val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - LOGGER.error(s"Can't use single_pass, because illegal syntax found: [$illegal] " + - "Please set it as 'true' or 'false'") - false - } - optionsFinal.put("single_pass", useOnePass.toString) - optionsFinal - } - - /** - * check whether using default value or not - */ - private def checkDefaultValue(value: String, default: String) = { - if (StringUtils.isEmpty(value)) { - default - } else { - value - } - } - - /** - * build CarbonLoadModel for data loading - * @param table CarbonTable object containing all metadata information for the table - * like table name, table path, schema, etc - * @param options Load options from user input - * @return a new CarbonLoadModel instance - */ - def buildCarbonLoadModelJava( - table: CarbonTable, - options: java.util.Map[String, String] - ): CarbonLoadModel = { - val carbonProperty: CarbonProperties = CarbonProperties.getInstance - val optionsFinal = getDataLoadingOptions(carbonProperty, options.asScala.toMap) - optionsFinal.put("sort_scope", "no_sort") - if (!options.containsKey("fileheader")) { - val csvHeader = table.getCreateOrderColumn(table.getTableName) - .asScala.map(_.getColName).mkString(",") - optionsFinal.put("fileheader", csvHeader) - } - val model = new CarbonLoadModel() - buildCarbonLoadModel( - table = table, - carbonProperty = carbonProperty, - options = options.asScala.toMap, - optionsFinal = optionsFinal, - carbonLoadModel = model, - hadoopConf = null) // we have provided 'fileheader', so it can be null - - // set default values - model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) - model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) - model.setUseOnePass(options.asScala.getOrElse("onepass", "false").toBoolean) - model.setDictionaryServerHost(options.asScala.getOrElse("dicthost", null)) - model.setDictionaryServerPort(options.asScala.getOrElse("dictport", "-1").toInt) - model - } - - /** - * build CarbonLoadModel for data loading - * @param table CarbonTable object containing all metadata information for the table - * like table name, table path, schema, etc - * @param carbonProperty Carbon property instance - * @param options Load options from user input - * @param optionsFinal Load options that populated with default values for optional options - * @param carbonLoadModel The output load model - * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in - * user provided load options - */ - def buildCarbonLoadModel( - table: CarbonTable, - carbonProperty: CarbonProperties, - options: immutable.Map[String, String], - optionsFinal: mutable.Map[String, String], - carbonLoadModel: CarbonLoadModel, - hadoopConf: Configuration): Unit = { - carbonLoadModel.setTableName(table.getTableName) - carbonLoadModel.setDatabaseName(table.getDatabaseName) - carbonLoadModel.setTablePath(table.getTablePath) - carbonLoadModel.setTableName(table.getTableName) - val dataLoadSchema = new CarbonDataLoadSchema(table) - // Need to fill dimension relation - carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) - val sort_scope = optionsFinal("sort_scope") - val single_pass = optionsFinal("single_pass") - val bad_records_logger_enable = optionsFinal("bad_records_logger_enable") - val bad_records_action = optionsFinal("bad_records_action") - var bad_record_path = optionsFinal("bad_record_path") - val global_sort_partitions = optionsFinal("global_sort_partitions") - val timestampformat = optionsFinal("timestampformat") - val dateFormat = optionsFinal("dateformat") - val delimeter = optionsFinal("delimiter") - val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1") - val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2") - val all_dictionary_path = optionsFinal("all_dictionary_path") - val column_dict = optionsFinal("columndict") - ValidateUtil.validateDateTimeFormat(timestampformat, "TimestampFormat") - ValidateUtil.validateDateTimeFormat(dateFormat, "DateFormat") - ValidateUtil.validateSortScope(table, sort_scope) - - if (bad_records_logger_enable.toBoolean || - LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) { - bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path) - if (!CarbonUtil.isValidBadStorePath(bad_record_path)) { - CarbonException.analysisException("Invalid bad records location.") - } - } - carbonLoadModel.setBadRecordsLocation(bad_record_path) - - ValidateUtil.validateGlobalSortPartitions(global_sort_partitions) - carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\")) - carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\"")) - carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#")) - - // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option, - // we should use table schema to generate file header. - var fileHeader = optionsFinal("fileheader") - val headerOption = options.get("header") - if (headerOption.isDefined) { - // whether the csv file has file header - // the default value is true - val header = try { - headerOption.get.toBoolean - } catch { - case ex: IllegalArgumentException => - throw new MalformedCarbonCommandException( - "'header' option should be either 'true' or 'false'. " + ex.getMessage) - } - if (header) { - if (fileHeader.nonEmpty) { - throw new MalformedCarbonCommandException( - "When 'header' option is true, 'fileheader' option is not required.") - } - } else { - if (fileHeader.isEmpty) { - fileHeader = table.getCreateOrderColumn(table.getTableName) - .asScala.map(_.getColName).mkString(",") - } - } - } - - carbonLoadModel.setTimestampformat(timestampformat) - carbonLoadModel.setDateFormat(dateFormat) - carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty( - CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, - CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) - - carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty( - CarbonCommonConstants.CARBON_DATE_FORMAT, - CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) - - carbonLoadModel.setSerializationNullFormat( - TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + - optionsFinal("serialization_null_format")) - - carbonLoadModel.setBadRecordsLoggerEnable( - TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable) - - carbonLoadModel.setBadRecordsAction( - TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action.toUpperCase) - - carbonLoadModel.setIsEmptyDataBadRecord( - DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + - optionsFinal("is_empty_data_bad_record")) - - carbonLoadModel.setSkipEmptyLine(optionsFinal("skip_empty_line")) - - carbonLoadModel.setSortScope(sort_scope) - carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb")) - carbonLoadModel.setGlobalSortPartitions(global_sort_partitions) - carbonLoadModel.setUseOnePass(single_pass.toBoolean) - - if (delimeter.equalsIgnoreCase(complex_delimeter_level1) || - complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) || - delimeter.equalsIgnoreCase(complex_delimeter_level2)) { - CarbonException.analysisException(s"Field Delimiter and Complex types delimiter are same") - } else { - carbonLoadModel.setComplexDelimiterLevel1( - CarbonUtil.delimiterConverter(complex_delimeter_level1)) - carbonLoadModel.setComplexDelimiterLevel2( - CarbonUtil.delimiterConverter(complex_delimeter_level2)) - } - // set local dictionary path, and dictionary file extension - carbonLoadModel.setAllDictPath(all_dictionary_path) - carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter)) - carbonLoadModel.setCsvHeader(fileHeader) - carbonLoadModel.setColDictFilePath(column_dict) - carbonLoadModel.setCsvHeaderColumns( - CommonUtil.getCsvHeaderColumns(carbonLoadModel, hadoopConf)) - - val validatedMaxColumns = CommonUtil.validateMaxColumns( - carbonLoadModel.getCsvHeaderColumns, - optionsFinal("maxcolumns")) - - carbonLoadModel.setMaxColumns(validatedMaxColumns.toString) - if (null == carbonLoadModel.getLoadMetadataDetails) { - CommonUtil.readLoadMetadataDetails(carbonLoadModel) - } - } - - private def isLoadDeletionRequired(metaDataLocation: String): Boolean = { - val details = SegmentStatusManager.readLoadMetadata(metaDataLocation) - if (details != null && details.nonEmpty) for (oneRow <- details) { - if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus || - SegmentStatus.COMPACTED == oneRow.getSegmentStatus || - SegmentStatus.INSERT_IN_PROGRESS == oneRow.getSegmentStatus || - SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneRow.getSegmentStatus) && - oneRow.getVisibility.equalsIgnoreCase("true")) { - return true - } - } - false - } - - def deleteLoadsAndUpdateMetadata( - isForceDeletion: Boolean, - carbonTable: CarbonTable): Unit = { - if (isLoadDeletionRequired(carbonTable.getMetadataPath)) { - val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath) - val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier - val carbonTableStatusLock = - CarbonLockFactory.getCarbonLockObj( - absoluteTableIdentifier, - LockUsage.TABLE_STATUS_LOCK - ) - - // Delete marked loads - val isUpdationRequired = - DeleteLoadFolders.deleteLoadFoldersFromFileSystem( - absoluteTableIdentifier, - isForceDeletion, - details, - carbonTable.getMetadataPath - ) - - var updationCompletionStaus = false - - if (isUpdationRequired) { - try { - // Update load metadate file after cleaning deleted nodes - if (carbonTableStatusLock.lockWithRetries()) { - LOGGER.info("Table status lock has been successfully acquired.") - - // read latest table status again. - val latestMetadata = SegmentStatusManager - .readLoadMetadata(carbonTable.getMetadataPath) - - // update the metadata details from old to new status. - val latestStatus = CarbonLoaderUtil - .updateLoadMetadataFromOldToNew(details, latestMetadata) - - CarbonLoaderUtil.writeLoadMetadata(absoluteTableIdentifier, latestStatus) - } else { - val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName - val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName - val errorMsg = "Clean files request is failed for " + - s"$dbName.$tableName" + - ". Not able to acquire the table status lock due to other operation " + - "running in the background." - LOGGER.audit(errorMsg) - LOGGER.error(errorMsg) - throw new Exception(errorMsg + " Please try after some time.") - } - updationCompletionStaus = true - } finally { - CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK) - } - if (updationCompletionStaus) { - DeleteLoadFolders - .physicalFactAndMeasureMetadataDeletion(absoluteTableIdentifier, - carbonTable.getMetadataPath, isForceDeletion) - } - } - } - } - -}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala index bbf345c..bcb7bd9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -321,7 +321,7 @@ object GlobalDictionaryUtil { carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) // get load count if (null == carbonLoadModel.getLoadMetadataDetails) { - CommonUtil.readLoadMetadataDetails(carbonLoadModel) + carbonLoadModel.readAndSetLoadMetadataDetails() } val absoluteTableIdentifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, table) DictionaryLoadModel( http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 3da603b..2dcff81 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.PartitionUtils import org.apache.carbondata.common.constants.LoggerAction +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.datatype.DataTypes @@ -42,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.processing.util.CarbonLoaderUtil -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil} /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index f47c9bc..c7dd553 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -163,7 +163,7 @@ object CarbonDataRDDFactory { if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) { // update the updated table status. For the case of Update Delta Compaction the Metadata // is filled in LoadModel, no need to refresh. - CommonUtil.readLoadMetadataDetails(carbonLoadModel) + carbonLoadModel.readAndSetLoadMetadataDetails() } val compactionThread = new Thread { @@ -282,7 +282,7 @@ object CarbonDataRDDFactory { loadModel.setTableName(table.getCarbonTableIdentifier.getTableName) loadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName) loadModel.setTablePath(table.getTablePath) - CommonUtil.readLoadMetadataDetails(loadModel) + loadModel.readAndSetLoadMetadataDetails() val loadStartTime = CarbonUpdateUtil.readCurrentTime() loadModel.setFactTimeStamp(loadStartTime) loadModel http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index ddc8586..1e19111 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -77,7 +77,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, } // scan again and determine if anything is there to merge again. - CommonUtil.readLoadMetadataDetails(carbonLoadModel) + carbonLoadModel.readAndSetLoadMetadataDetails() segList = carbonLoadModel.getLoadMetadataDetails // in case of major compaction we will scan only once and come out as it will keep // on doing major for the new loads also. http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index e61b636..5dcca6d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -35,13 +35,13 @@ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CarbonException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.spark.CarbonOption -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.streaming.{CarbonStreamException, StreamSinkFactory} /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index c4d32b4..e93ab25 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -22,9 +22,9 @@ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil +import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._ -import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException} /** * Below command class will be used to create datamap on table http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index 1fa2494..e5db286 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand +import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException} import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} @@ -34,7 +35,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.events._ -import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, NoSuchDataMapException} /** * Drops the datamap and any related tables associated with the datamap http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index febb83e..18c1339 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -24,12 +24,13 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel, DataCommand} +import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel} import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.AlterTableUtil +import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory @@ -44,7 +45,6 @@ import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCo import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} -import org.apache.carbondata.spark.exception.ConcurrentOperationException import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory import org.apache.carbondata.spark.util.CommonUtil import org.apache.carbondata.streaming.StreamHandoffRDD @@ -89,8 +89,6 @@ case class CarbonAlterTableCompactionCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) - val tableName = alterTableModel.tableName.toLowerCase - val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table) if (isLoadInProgress) { val message = "Cannot run data loading and compaction on same table concurrently. " + @@ -146,8 +144,7 @@ case class CarbonAlterTableCompactionCommand( operationContext: OperationContext): Unit = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase) - val compactionSize: Long = CarbonDataMergerUtil - .getCompactionSize(compactionType, carbonLoadModel) + val compactionSize = CarbonDataMergerUtil.getCompactionSize(compactionType, carbonLoadModel) if (CompactionType.IUD_UPDDEL_DELTA == compactionType) { if (alterTableModel.segmentUpdateStatusManager.isDefined) { carbonLoadModel.setSegmentUpdateStatusManager( @@ -162,7 +159,7 @@ case class CarbonAlterTableCompactionCommand( val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable if (null == carbonLoadModel.getLoadMetadataDetails) { - CommonUtil.readLoadMetadataDetails(carbonLoadModel) + carbonLoadModel.readAndSetLoadMetadataDetails() } if (compactionType == CompactionType.STREAMING) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index c7b59d4..f105778 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -47,6 +47,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils} +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datamap.DataMapStoreManager @@ -68,13 +69,12 @@ import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable} import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.exception.NoRetryException -import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD} -import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil} case class CarbonLoadDataCommand( databaseNameOp: Option[String], @@ -148,7 +148,7 @@ case class CarbonLoadDataCommand( val carbonLoadModel = new CarbonLoadModel() try { val tableProperties = table.getTableInfo.getFactTable.getTableProperties - val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options) + val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava) optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope", carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, @@ -163,10 +163,8 @@ case class CarbonLoadDataCommand( carbonLoadModel.setAggLoadRequest( internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean) carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", "")) - DataLoadingUtil.buildCarbonLoadModel( - table, - carbonProperty, - options, + new CarbonLoadModelBuilder(table).build( + options.asJava, optionsFinal, carbonLoadModel, hadoopConf @@ -183,7 +181,7 @@ case class CarbonLoadDataCommand( carbonLoadModel, factPath, dataFrame.isDefined, - optionsFinal.asJava, + optionsFinal, options.asJava, isOverwriteTable) operationContext.setProperty("isOverwrite", isOverwriteTable) @@ -191,7 +189,7 @@ case class CarbonLoadDataCommand( // First system has to partition the data first and then call the load data LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") // Clean up the old invalid segment data before creating a new entry for new load. - DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table) + SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false) // add the start entry for the new load in the table status file if (updateModel.isEmpty && !table.isHivePartitionTable) { CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable) http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index 874d416..50c5eca 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -22,13 +22,13 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.processing.loading.FailureCauses -import org.apache.carbondata.spark.exception.ConcurrentOperationException /** * IUD update delete and compaction framework. http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 2f12bef..bbea15b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.storage.StorageLevel +import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} @@ -32,7 +33,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent} import org.apache.carbondata.processing.loading.FailureCauses -import org.apache.carbondata.spark.exception.ConcurrentOperationException private[sql] case class CarbonProjectForUpdateCommand( plan: LogicalPlan, http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala index 5817d88..220d75d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala @@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.HiveSessionCatalog +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException /** * Util for IUD common function http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index fed4235..bf72325 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -33,7 +33,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} -import org.apache.carbondata.spark.util.DataLoadingUtil /** * Below command class will be used to create pre-aggregate table @@ -182,7 +181,7 @@ case class CreatePreAggregateTableCommand( // This will be used to check if the parent table has any segments or not. If not then no // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT // table. - DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, parentTable) + SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false) val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath) if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS || load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index feef7a1..7e3b80e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.spark.sql.types.DataType +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} @@ -42,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchem import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.format.TableInfo -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index 7a56dbf..fc780cb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} import org.apache.spark.util.AlterTableUtil +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager @@ -37,7 +38,6 @@ import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.SchemaEvolutionEntry -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException private[sql] case class CarbonAlterTableRenameCommand( alterTableRenameModel: AlterTableRenameModel) http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala index 987d4fe..9e0cee5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala @@ -18,12 +18,12 @@ package org.apache.spark.sql.execution.command.timeseries import org.apache.spark.sql.execution.command.{DataMapField, Field} +import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES import org.apache.carbondata.core.metadata.schema.datamap.Granularity import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.preagg.TimeSeriesUDF -import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException} /** * Utility class for time series to keep http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index 945f47f..072216c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -46,8 +46,8 @@ import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter import org.apache.carbondata.hadoop.util.ObjectSerializationUtil import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable -import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util} +import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util} class CarbonFileFormat extends FileFormat @@ -82,7 +82,7 @@ with Serializable { TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession) val model = new CarbonLoadModel val carbonProperty = CarbonProperties.getInstance() - val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options) + val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava) val tableProperties = table.getTableInfo.getFactTable.getTableProperties optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope", carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, @@ -97,14 +97,11 @@ with Serializable { val optionsLocal = new mutable.HashMap[String, String]() optionsLocal ++= options optionsLocal += (("header", "false")) - DataLoadingUtil.buildCarbonLoadModel( - table, - carbonProperty, - optionsLocal.toMap, + new CarbonLoadModelBuilder(table).build( + optionsLocal.toMap.asJava, optionsFinal, model, - conf - ) + conf) model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean) model.setDictionaryServerHost(options.getOrElse("dicthost", null)) model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt) http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index b174b94..b36d7c0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -32,10 +32,10 @@ import org.apache.spark.sql.execution.datasources.RefreshTable import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.util.{CarbonReflectionUtils, FileUtils} +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.processing.merger.CompactionType -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException /** * Carbon strategies for ddl commands http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala index 608ec60..7028dcf 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.command.AlterTableRenameCommand import org.apache.spark.sql.execution.command.mutation.{CarbonProjectForDeleteCommand, CarbonProjectForUpdateCommand} import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand} -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException /** * Strategy for streaming table, like blocking unsupported operation http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala index 7ca34af..27c7d17 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala @@ -24,9 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams} -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException case class CarbonDropDatabaseCommand(command: DropDatabaseCommand) extends RunnableCommand { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 4045478..542115e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -36,9 +36,9 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.spark.CarbonOption -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index ad6d0c7..ef4836e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -32,10 +32,10 @@ import org.apache.spark.sql.types.StructField import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.hadoop.util.SchemaReader import org.apache.carbondata.spark.CarbonOption -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index bc36e9c..aaa87a3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog} import org.apache.spark.sql.hive.HiveExternalCatalog._ +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory @@ -39,7 +40,6 @@ import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException object AlterTableUtil { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala index bc62902..a8094b6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala @@ -19,10 +19,10 @@ package org.apache.spark.util import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException /** * table api util http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala index 3c151f0..a0dd7d9 100644 --- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -43,10 +43,10 @@ import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlPa import org.apache.spark.sql.types.DecimalType import org.apache.spark.util.CarbonReflectionUtils +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CarbonScalaUtil /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala index c676b01..5ade510 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala @@ -4,7 +4,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException /** * Created by rahul on 19/9/17. http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala index 71c5477..0b37e46 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala @@ -25,7 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.processing.util.TableOptionConstant /** @@ -63,7 +63,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll { CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) carbonLoadModel.setCsvHeaderColumns( - CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration)) + LoadOption.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration)) // Create table and metadata folders if not exist val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) val fileType = FileFactory.getFileType(metadataDirectoryPath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala index c0e1781..6149e82 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala @@ -23,14 +23,14 @@ import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.exception.DataLoadingException -import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} +import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.processing.util.TableOptionConstant -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException /** * test case for external column dictionary generation @@ -176,7 +176,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft CarbonCommonConstants.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) carbonLoadModel.setCsvHeaderColumns( - CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration)) + LoadOption.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration)) carbonLoadModel.setMaxColumns("100") // Create table and metadata folders if not exist val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 2552ca8..afb34b9 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -31,12 +31,12 @@ import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.types.StructType import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala index 9da7244..65a006b 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala @@ -21,11 +21,11 @@ import org.apache.spark.sql.common.util.Spark2QueryTest import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala index d36dd26..b035834 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala @@ -26,9 +26,9 @@ import org.apache.spark.sql.common.util.Spark2QueryTest import org.apache.spark.sql.test.TestQueryExecutor import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index fef2da6..31c5b27 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -18,14 +18,16 @@ package org.apache.carbondata.processing.loading.model; import java.io.Serializable; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; - +import org.apache.carbondata.core.util.path.CarbonTablePath; public class CarbonLoadModel implements Serializable { @@ -785,4 +787,13 @@ public class CarbonLoadModel implements Serializable { public void setSkipEmptyLine(String skipEmptyLine) { this.skipEmptyLine = skipEmptyLine; } + + /** + * Read segments metadata from table status file and set it to this load model object + */ + public void readAndSetLoadMetadataDetails() { + String metadataPath = CarbonTablePath.getMetadataPath(tablePath); + LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metadataPath); + setLoadMetadataDetails(Arrays.asList(details)); + } }