http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
deleted file mode 100644
index 9c5ab69..0000000
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ /dev/null
@@ -1,451 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import scala.collection.{immutable, mutable}
-import scala.collection.JavaConverters._
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.util.CarbonException
-
-import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
LockUsage}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
-import org.apache.carbondata.processing.util.{CarbonLoaderUtil, 
DeleteLoadFolders, TableOptionConstant}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.ValidateUtil
-
-/**
- * the util object of data loading
- */
-object DataLoadingUtil {
-
-  val LOGGER: LogService = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  /**
-   * get data loading options and initialise default value
-   */
-  def getDataLoadingOptions(
-      carbonProperty: CarbonProperties,
-      options: immutable.Map[String, String]): mutable.Map[String, String] = {
-    val optionsFinal = scala.collection.mutable.Map[String, String]()
-    optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
-    optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
-    optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
-    optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
-    optionsFinal.put("columndict", options.getOrElse("columndict", null))
-
-    optionsFinal.put("escapechar",
-      CarbonLoaderUtil.getEscapeChar(options.getOrElse("escapechar", "\\")))
-
-    optionsFinal.put(
-      "serialization_null_format",
-      options.getOrElse("serialization_null_format", "\\N"))
-
-    optionsFinal.put(
-      "bad_records_logger_enable",
-      options.getOrElse(
-        "bad_records_logger_enable",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
-          
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
-
-    val badRecordActionValue = carbonProperty.getProperty(
-      CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-      CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-
-    optionsFinal.put(
-      "bad_records_action",
-      options.getOrElse(
-        "bad_records_action",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
-          badRecordActionValue)))
-
-    optionsFinal.put(
-      "is_empty_data_bad_record",
-      options.getOrElse(
-        "is_empty_data_bad_record",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-          
CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
-
-    optionsFinal.put(
-      "skip_empty_line",
-      options.getOrElse(
-        "skip_empty_line",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE)))
-
-    optionsFinal.put("all_dictionary_path", 
options.getOrElse("all_dictionary_path", ""))
-
-    optionsFinal.put(
-      "complex_delimiter_level_1",
-      options.getOrElse("complex_delimiter_level_1", "\\$"))
-
-    optionsFinal.put(
-      "complex_delimiter_level_2",
-      options.getOrElse("complex_delimiter_level_2", "\\:"))
-
-    optionsFinal.put(
-      "dateformat",
-      options.getOrElse(
-        "dateformat",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
-          CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
-
-    optionsFinal.put(
-      "timestampformat",
-      options.getOrElse(
-        "timestampformat",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
-          CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT)))
-
-    optionsFinal.put(
-      "global_sort_partitions",
-      options.getOrElse(
-        "global_sort_partitions",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
-          null)))
-
-    optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
-
-    optionsFinal.put(
-      "batch_sort_size_inmb",
-      options.getOrElse(
-        "batch_sort_size_inmb",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
-          carbonProperty.getProperty(
-            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
-
-    optionsFinal.put(
-      "bad_record_path",
-      options.getOrElse(
-        "bad_record_path",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-          carbonProperty.getProperty(
-            CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-            CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
-
-    val useOnePass = options.getOrElse(
-      "single_pass",
-      carbonProperty.getProperty(
-        CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
-        
CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase 
match {
-      case "true" =>
-        true
-      case "false" =>
-        // when single_pass = false  and if either alldictionarypath
-        // or columnDict is configured the do not allow load
-        if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
-            StringUtils.isNotEmpty(optionsFinal("columndict"))) {
-          throw new MalformedCarbonCommandException(
-            "Can not use all_dictionary_path or columndict without 
single_pass.")
-        } else {
-          false
-        }
-      case illegal =>
-        val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-        LOGGER.error(s"Can't use single_pass, because illegal syntax found: 
[$illegal] " +
-                     "Please set it as 'true' or 'false'")
-        false
-    }
-    optionsFinal.put("single_pass", useOnePass.toString)
-    optionsFinal
-  }
-
-  /**
-   * check whether using default value or not
-   */
-  private def checkDefaultValue(value: String, default: String) = {
-    if (StringUtils.isEmpty(value)) {
-      default
-    } else {
-      value
-    }
-  }
-
-  /**
-   * build CarbonLoadModel for data loading
-   * @param table CarbonTable object containing all metadata information for 
the table
-   *              like table name, table path, schema, etc
-   * @param options Load options from user input
-   * @return a new CarbonLoadModel instance
-   */
-  def buildCarbonLoadModelJava(
-      table: CarbonTable,
-      options: java.util.Map[String, String]
-  ): CarbonLoadModel = {
-    val carbonProperty: CarbonProperties = CarbonProperties.getInstance
-    val optionsFinal = getDataLoadingOptions(carbonProperty, 
options.asScala.toMap)
-    optionsFinal.put("sort_scope", "no_sort")
-    if (!options.containsKey("fileheader")) {
-      val csvHeader = table.getCreateOrderColumn(table.getTableName)
-        .asScala.map(_.getColName).mkString(",")
-      optionsFinal.put("fileheader", csvHeader)
-    }
-    val model = new CarbonLoadModel()
-    buildCarbonLoadModel(
-      table = table,
-      carbonProperty = carbonProperty,
-      options = options.asScala.toMap,
-      optionsFinal = optionsFinal,
-      carbonLoadModel = model,
-      hadoopConf = null)  // we have provided 'fileheader', so it can be null
-
-    // set default values
-    
model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-    model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
-    model.setUseOnePass(options.asScala.getOrElse("onepass", 
"false").toBoolean)
-    model.setDictionaryServerHost(options.asScala.getOrElse("dicthost", null))
-    model.setDictionaryServerPort(options.asScala.getOrElse("dictport", 
"-1").toInt)
-    model
-  }
-
-  /**
-   * build CarbonLoadModel for data loading
-   * @param table CarbonTable object containing all metadata information for 
the table
-   *              like table name, table path, schema, etc
-   * @param carbonProperty Carbon property instance
-   * @param options Load options from user input
-   * @param optionsFinal Load options that populated with default values for 
optional options
-   * @param carbonLoadModel The output load model
-   * @param hadoopConf hadoopConf is needed to read CSV header if there 
'fileheader' is not set in
-   *                   user provided load options
-   */
-  def buildCarbonLoadModel(
-      table: CarbonTable,
-      carbonProperty: CarbonProperties,
-      options: immutable.Map[String, String],
-      optionsFinal: mutable.Map[String, String],
-      carbonLoadModel: CarbonLoadModel,
-      hadoopConf: Configuration): Unit = {
-    carbonLoadModel.setTableName(table.getTableName)
-    carbonLoadModel.setDatabaseName(table.getDatabaseName)
-    carbonLoadModel.setTablePath(table.getTablePath)
-    carbonLoadModel.setTableName(table.getTableName)
-    val dataLoadSchema = new CarbonDataLoadSchema(table)
-    // Need to fill dimension relation
-    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-    val sort_scope = optionsFinal("sort_scope")
-    val single_pass = optionsFinal("single_pass")
-    val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
-    val bad_records_action = optionsFinal("bad_records_action")
-    var bad_record_path = optionsFinal("bad_record_path")
-    val global_sort_partitions = optionsFinal("global_sort_partitions")
-    val timestampformat = optionsFinal("timestampformat")
-    val dateFormat = optionsFinal("dateformat")
-    val delimeter = optionsFinal("delimiter")
-    val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
-    val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
-    val all_dictionary_path = optionsFinal("all_dictionary_path")
-    val column_dict = optionsFinal("columndict")
-    ValidateUtil.validateDateTimeFormat(timestampformat, "TimestampFormat")
-    ValidateUtil.validateDateTimeFormat(dateFormat, "DateFormat")
-    ValidateUtil.validateSortScope(table, sort_scope)
-
-    if (bad_records_logger_enable.toBoolean ||
-        LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
-      bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path)
-      if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
-        CarbonException.analysisException("Invalid bad records location.")
-      }
-    }
-    carbonLoadModel.setBadRecordsLocation(bad_record_path)
-
-    ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
-    
carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), 
"\\"))
-    carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), 
"\""))
-    
carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), 
"#"))
-
-    // if there isn't file header in csv file and load sql doesn't provide 
FILEHEADER option,
-    // we should use table schema to generate file header.
-    var fileHeader = optionsFinal("fileheader")
-    val headerOption = options.get("header")
-    if (headerOption.isDefined) {
-      // whether the csv file has file header
-      // the default value is true
-      val header = try {
-        headerOption.get.toBoolean
-      } catch {
-        case ex: IllegalArgumentException =>
-          throw new MalformedCarbonCommandException(
-            "'header' option should be either 'true' or 'false'. " + 
ex.getMessage)
-      }
-      if (header) {
-        if (fileHeader.nonEmpty) {
-          throw new MalformedCarbonCommandException(
-            "When 'header' option is true, 'fileheader' option is not 
required.")
-        }
-      } else {
-        if (fileHeader.isEmpty) {
-          fileHeader = table.getCreateOrderColumn(table.getTableName)
-            .asScala.map(_.getColName).mkString(",")
-        }
-      }
-    }
-
-    carbonLoadModel.setTimestampformat(timestampformat)
-    carbonLoadModel.setDateFormat(dateFormat)
-    carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
-      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-
-    carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
-      CarbonCommonConstants.CARBON_DATE_FORMAT,
-      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
-
-    carbonLoadModel.setSerializationNullFormat(
-        TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
-        optionsFinal("serialization_null_format"))
-
-    carbonLoadModel.setBadRecordsLoggerEnable(
-        TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + 
bad_records_logger_enable)
-
-    carbonLoadModel.setBadRecordsAction(
-        TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + 
bad_records_action.toUpperCase)
-
-    carbonLoadModel.setIsEmptyDataBadRecord(
-        DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
-        optionsFinal("is_empty_data_bad_record"))
-
-    carbonLoadModel.setSkipEmptyLine(optionsFinal("skip_empty_line"))
-
-    carbonLoadModel.setSortScope(sort_scope)
-    carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
-    carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
-    carbonLoadModel.setUseOnePass(single_pass.toBoolean)
-
-    if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
-        complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
-        delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
-      CarbonException.analysisException(s"Field Delimiter and Complex types 
delimiter are same")
-    } else {
-      carbonLoadModel.setComplexDelimiterLevel1(
-        CarbonUtil.delimiterConverter(complex_delimeter_level1))
-      carbonLoadModel.setComplexDelimiterLevel2(
-        CarbonUtil.delimiterConverter(complex_delimeter_level2))
-    }
-    // set local dictionary path, and dictionary file extension
-    carbonLoadModel.setAllDictPath(all_dictionary_path)
-    carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
-    carbonLoadModel.setCsvHeader(fileHeader)
-    carbonLoadModel.setColDictFilePath(column_dict)
-    carbonLoadModel.setCsvHeaderColumns(
-      CommonUtil.getCsvHeaderColumns(carbonLoadModel, hadoopConf))
-
-    val validatedMaxColumns = CommonUtil.validateMaxColumns(
-      carbonLoadModel.getCsvHeaderColumns,
-      optionsFinal("maxcolumns"))
-
-    carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
-    if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
-    }
-  }
-
-  private def isLoadDeletionRequired(metaDataLocation: String): Boolean = {
-    val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
-    if (details != null && details.nonEmpty) for (oneRow <- details) {
-      if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus ||
-           SegmentStatus.COMPACTED == oneRow.getSegmentStatus ||
-           SegmentStatus.INSERT_IN_PROGRESS == oneRow.getSegmentStatus ||
-           SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == 
oneRow.getSegmentStatus) &&
-          oneRow.getVisibility.equalsIgnoreCase("true")) {
-        return true
-      }
-    }
-    false
-  }
-
-  def deleteLoadsAndUpdateMetadata(
-      isForceDeletion: Boolean,
-      carbonTable: CarbonTable): Unit = {
-    if (isLoadDeletionRequired(carbonTable.getMetadataPath)) {
-      val details = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
-      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-      val carbonTableStatusLock =
-        CarbonLockFactory.getCarbonLockObj(
-          absoluteTableIdentifier,
-          LockUsage.TABLE_STATUS_LOCK
-        )
-
-      // Delete marked loads
-      val isUpdationRequired =
-        DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
-          absoluteTableIdentifier,
-          isForceDeletion,
-          details,
-          carbonTable.getMetadataPath
-        )
-
-      var updationCompletionStaus = false
-
-      if (isUpdationRequired) {
-        try {
-          // Update load metadate file after cleaning deleted nodes
-          if (carbonTableStatusLock.lockWithRetries()) {
-            LOGGER.info("Table status lock has been successfully acquired.")
-
-            // read latest table status again.
-            val latestMetadata = SegmentStatusManager
-              .readLoadMetadata(carbonTable.getMetadataPath)
-
-            // update the metadata details from old to new status.
-            val latestStatus = CarbonLoaderUtil
-              .updateLoadMetadataFromOldToNew(details, latestMetadata)
-
-            CarbonLoaderUtil.writeLoadMetadata(absoluteTableIdentifier, 
latestStatus)
-          } else {
-            val dbName = 
absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
-            val tableName = 
absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
-            val errorMsg = "Clean files request is failed for " +
-                           s"$dbName.$tableName" +
-                           ". Not able to acquire the table status lock due to 
other operation " +
-                           "running in the background."
-            LOGGER.audit(errorMsg)
-            LOGGER.error(errorMsg)
-            throw new Exception(errorMsg + " Please try after some time.")
-          }
-          updationCompletionStaus = true
-        } finally {
-          CarbonLockUtil.fileUnlock(carbonTableStatusLock, 
LockUsage.TABLE_STATUS_LOCK)
-        }
-        if (updationCompletionStaus) {
-          DeleteLoadFolders
-            .physicalFactAndMeasureMetadataDeletion(absoluteTableIdentifier,
-              carbonTable.getMetadataPath, isForceDeletion)
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index bbf345c..bcb7bd9 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -321,7 +321,7 @@ object GlobalDictionaryUtil {
       
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
     // get load count
     if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+      carbonLoadModel.readAndSetLoadMetadataDetails()
     }
     val absoluteTableIdentifier = 
AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, table)
     DictionaryLoadModel(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 3da603b..2dcff81 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.PartitionUtils
 
 import org.apache.carbondata.common.constants.LoggerAction
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataTypes
@@ -42,7 +43,6 @@ import 
org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f47c9bc..c7dd553 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -163,7 +163,7 @@ object CarbonDataRDDFactory {
     if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) {
       // update the updated table status. For the case of Update Delta 
Compaction the Metadata
       // is filled in LoadModel, no need to refresh.
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+      carbonLoadModel.readAndSetLoadMetadataDetails()
     }
 
     val compactionThread = new Thread {
@@ -282,7 +282,7 @@ object CarbonDataRDDFactory {
     loadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
     loadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
     loadModel.setTablePath(table.getTablePath)
-    CommonUtil.readLoadMetadataDetails(loadModel)
+    loadModel.readAndSetLoadMetadataDetails()
     val loadStartTime = CarbonUpdateUtil.readCurrentTime()
     loadModel.setFactTimeStamp(loadStartTime)
     loadModel

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index ddc8586..1e19111 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -77,7 +77,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       }
 
       // scan again and determine if anything is there to merge again.
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+      carbonLoadModel.readAndSetLoadMetadataDetails()
       segList = carbonLoadModel.getLoadMetadataDetails
       // in case of major compaction we will scan only once and come out as it 
will keep
       // on doing major for the new loads also.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index e61b636..5dcca6d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -35,13 +35,13 @@ import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CarbonException
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.streaming.{CarbonStreamException, 
StreamSinkFactory}
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index c4d32b4..e93ab25 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -22,9 +22,9 @@ import org.apache.spark.sql.execution.command._
 import 
org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 
+import 
org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, 
MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, 
MalformedDataMapCommandException}
 
 /**
  * Below command class will be used to create datamap on table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 1fa2494..e5db286 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -27,6 +27,7 @@ import 
org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
+import 
org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, 
NoSuchDataMapException}
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
@@ -34,7 +35,6 @@ import 
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import 
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.events._
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, 
NoSuchDataMapException}
 
 /**
  * Drops the datamap and any related tables associated with the datamap

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index febb83e..18c1339 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -24,12 +24,13 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableModel, 
AtomicRunnableCommand, CarbonMergerMapping, CompactionModel, DataCommand}
+import org.apache.spark.sql.execution.command.{AlterTableModel, 
AtomicRunnableCommand, CarbonMergerMapping, CompactionModel}
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.AlterTableUtil
 
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -44,7 +45,6 @@ import 
org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCo
 import 
org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, 
CompactionType}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 import org.apache.carbondata.spark.util.CommonUtil
 import org.apache.carbondata.streaming.StreamHandoffRDD
@@ -89,8 +89,6 @@ case class CarbonAlterTableCompactionCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService =
       LogServiceFactory.getLogService(this.getClass.getName)
-    val tableName = alterTableModel.tableName.toLowerCase
-    val databaseName = 
alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
     val isLoadInProgress = 
SegmentStatusManager.checkIfAnyLoadInProgressForTable(table)
     if (isLoadInProgress) {
       val message = "Cannot run data loading and compaction on same table 
concurrently. " +
@@ -146,8 +144,7 @@ case class CarbonAlterTableCompactionCommand(
       operationContext: OperationContext): Unit = {
     val LOGGER: LogService = 
LogServiceFactory.getLogService(this.getClass.getName)
     val compactionType = 
CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
-    val compactionSize: Long = CarbonDataMergerUtil
-      .getCompactionSize(compactionType, carbonLoadModel)
+    val compactionSize = 
CarbonDataMergerUtil.getCompactionSize(compactionType, carbonLoadModel)
     if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
       if (alterTableModel.segmentUpdateStatusManager.isDefined) {
         carbonLoadModel.setSegmentUpdateStatusManager(
@@ -162,7 +159,7 @@ case class CarbonAlterTableCompactionCommand(
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+      carbonLoadModel.readAndSetLoadMetadataDetails()
     }
 
     if (compactionType == CompactionType.STREAMING) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index c7b59d4..f105778 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -47,6 +47,7 @@ import org.apache.spark.sql.types.{StringType, StructField, 
StructType}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -68,13 +69,12 @@ import 
org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, 
StringArrayWritable}
 import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, 
LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, 
CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import 
org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, 
CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, 
DataLoadingUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, 
GlobalDictionaryUtil}
 
 case class CarbonLoadDataCommand(
     databaseNameOp: Option[String],
@@ -148,7 +148,7 @@ case class CarbonLoadDataCommand(
     val carbonLoadModel = new CarbonLoadModel()
     try {
       val tableProperties = table.getTableInfo.getFactTable.getTableProperties
-      val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, 
options)
+      val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
       optionsFinal.put("sort_scope", 
tableProperties.asScala.getOrElse("sort_scope",
         
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
           carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
@@ -163,10 +163,8 @@ case class CarbonLoadDataCommand(
       carbonLoadModel.setAggLoadRequest(
         internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, 
"false").toBoolean)
       
carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
-      DataLoadingUtil.buildCarbonLoadModel(
-        table,
-        carbonProperty,
-        options,
+      new CarbonLoadModelBuilder(table).build(
+        options.asJava,
         optionsFinal,
         carbonLoadModel,
         hadoopConf
@@ -183,7 +181,7 @@ case class CarbonLoadDataCommand(
             carbonLoadModel,
             factPath,
             dataFrame.isDefined,
-            optionsFinal.asJava,
+            optionsFinal,
             options.asJava,
             isOverwriteTable)
         operationContext.setProperty("isOverwrite", isOverwriteTable)
@@ -191,7 +189,7 @@ case class CarbonLoadDataCommand(
         // First system has to partition the data first and then call the load 
data
         LOGGER.info(s"Initiating Direct Load for the Table : 
($dbName.$tableName)")
         // Clean up the old invalid segment data before creating a new entry 
for new load.
-        DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, 
table)
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false)
         // add the start entry for the new load in the table status file
         if (updateModel.isEmpty && !table.isHivePartitionTable) {
           
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, 
isOverwriteTable)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 874d416..50c5eca 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -22,13 +22,13 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteFromTablePostEvent, 
DeleteFromTablePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.loading.FailureCauses
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 /**
  * IUD update delete and compaction framework.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 2f12bef..bbea15b 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -24,6 +24,7 @@ import 
org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.storage.StorageLevel
 
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
LockUsage}
@@ -32,7 +33,6 @@ import 
org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, 
UpdateTablePostEvent, UpdateTablePreEvent}
 import org.apache.carbondata.processing.loading.FailureCauses
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 private[sql] case class CarbonProjectForUpdateCommand(
     plan: LogicalPlan,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
index 5817d88..220d75d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
@@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, 
LogicalPlan}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.HiveSessionCatalog
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
  * Util for IUD common function

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index fed4235..bf72325 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -33,7 +33,6 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import 
org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, 
SegmentStatusManager}
-import org.apache.carbondata.spark.util.DataLoadingUtil
 
 /**
  * Below command class will be used to create pre-aggregate table
@@ -182,7 +181,7 @@ case class CreatePreAggregateTableCommand(
     // This will be used to check if the parent table has any segments or not. 
If not then no
     // need to fire load for pre-aggregate table. Therefore reading the load 
details for PARENT
     // table.
-    DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, 
parentTable)
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
     val loadAvailable = 
SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
     if (loadAvailable.exists(load => load.getSegmentStatus == 
SegmentStatus.INSERT_IN_PROGRESS ||
       load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index feef7a1..7e3b80e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.types.DataType
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
@@ -42,7 +43,6 @@ import 
org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchem
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.format.TableInfo
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 7a56dbf..fc780cb 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -25,6 +25,7 @@ import 
org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.util.AlterTableUtil
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -37,7 +38,6 @@ import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{AlterTableRenamePostEvent, 
AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 private[sql] case class CarbonAlterTableRenameCommand(
     alterTableRenameModel: AlterTableRenameModel)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index 987d4fe..9e0cee5 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -18,12 +18,12 @@ package org.apache.spark.sql.execution.command.timeseries
 
 import org.apache.spark.sql.execution.command.{DataMapField, Field}
 
+import 
org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, 
MalformedDataMapCommandException}
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import 
org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
 import org.apache.carbondata.core.metadata.schema.datamap.Granularity
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.preagg.TimeSeriesUDF
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, 
MalformedDataMapCommandException}
 
 /**
  * Utility class for time series to keep

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 945f47f..072216c 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -46,8 +46,8 @@ import 
org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu
 import 
org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
 import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, 
Util}
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, 
CarbonLoadModelBuilder, LoadOption}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
 
 class CarbonFileFormat
   extends FileFormat
@@ -82,7 +82,7 @@ with Serializable {
       TableIdentifier(options("tableName"), 
options.get("dbName")))(sparkSession)
     val model = new CarbonLoadModel
     val carbonProperty = CarbonProperties.getInstance()
-    val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, 
options)
+    val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
     val tableProperties = table.getTableInfo.getFactTable.getTableProperties
     optionsFinal.put("sort_scope", 
tableProperties.asScala.getOrElse("sort_scope",
       
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
@@ -97,14 +97,11 @@ with Serializable {
     val optionsLocal = new mutable.HashMap[String, String]()
     optionsLocal ++= options
     optionsLocal += (("header", "false"))
-    DataLoadingUtil.buildCarbonLoadModel(
-      table,
-      carbonProperty,
-      optionsLocal.toMap,
+    new CarbonLoadModelBuilder(table).build(
+      optionsLocal.toMap.asJava,
       optionsFinal,
       model,
-      conf
-    )
+      conf)
     model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
     model.setDictionaryServerHost(options.getOrElse("dicthost", null))
     model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index b174b94..b36d7c0 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -32,10 +32,10 @@ import 
org.apache.spark.sql.execution.datasources.RefreshTable
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.merger.CompactionType
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
  * Carbon strategies for ddl commands

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 608ec60..7028dcf 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -25,7 +25,7 @@ import 
org.apache.spark.sql.execution.command.AlterTableRenameCommand
 import 
org.apache.spark.sql.execution.command.mutation.{CarbonProjectForDeleteCommand, 
CarbonProjectForUpdateCommand}
 import 
org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand,
 CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
 
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 
 /**
  * Strategy for streaming table, like blocking unsupported operation

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 7ca34af..27c7d17 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -24,9 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
SessionParams}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
   extends RunnableCommand {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 4045478..542115e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -36,9 +36,9 @@ import 
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index ad6d0c7..ef4836e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -32,10 +32,10 @@ import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.hadoop.util.SchemaReader
 import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index bc36e9c..aaa87a3 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, 
HiveExternalCatalog}
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -39,7 +40,6 @@ import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 object AlterTableUtil {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala 
b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
index bc62902..a8094b6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
@@ -19,10 +19,10 @@ package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
  * table api util

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
 
b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index 3c151f0..a0dd7d9 100644
--- 
a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ 
b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -43,10 +43,10 @@ import 
org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlPa
 import org.apache.spark.sql.types.DecimalType
 import org.apache.spark.util.CarbonReflectionUtils
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.util.{CarbonProperties, 
ThreadLocalSessionInfo}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
index c676b01..5ade510 100644
--- 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
@@ -4,7 +4,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 
 /**
  * Created by rahul on 19/9/17.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 71c5477..0b37e46 100644
--- 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -25,7 +25,7 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.TableOptionConstant
 
 /**
@@ -63,7 +63,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with 
BeforeAndAfterAll {
       CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
       CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
     carbonLoadModel.setCsvHeaderColumns(
-      CommonUtil.getCsvHeaderColumns(carbonLoadModel, 
FileFactory.getConfiguration))
+      LoadOption.getCsvHeaderColumns(carbonLoadModel, 
FileFactory.getConfiguration))
     // Create table and metadata folders if not exist
     val metadataDirectoryPath = 
CarbonTablePath.getMetadataPath(table.getTablePath)
     val fileType = FileFactory.getFileType(metadataDirectoryPath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index c0e1781..6149e82 100644
--- 
a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -23,14 +23,14 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.scalatest.BeforeAndAfterAll
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.exception.DataLoadingException
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, 
CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.TableOptionConstant
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
  * test case for external column dictionary generation
@@ -176,7 +176,7 @@ class ExternalColumnDictionaryTestCase extends 
Spark2QueryTest with BeforeAndAft
       CarbonCommonConstants.CARBON_DATE_FORMAT,
       CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
     carbonLoadModel.setCsvHeaderColumns(
-      CommonUtil.getCsvHeaderColumns(carbonLoadModel, 
FileFactory.getConfiguration))
+      LoadOption.getCsvHeaderColumns(carbonLoadModel, 
FileFactory.getConfiguration))
     carbonLoadModel.setMaxColumns("100")
     // Create table and metadata folders if not exist
     val metadataDirectoryPath = 
CarbonTablePath.getMetadataPath(table.getTablePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 2552ca8..afb34b9 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -31,12 +31,12 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.types.StructType
 import org.scalatest.BeforeAndAfterAll
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 9da7244..65a006b 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -21,11 +21,11 @@ import org.apache.spark.sql.common.util.Spark2QueryTest
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.scalatest.BeforeAndAfterAll
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index d36dd26..b035834 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -26,9 +26,9 @@ import org.apache.spark.sql.common.util.Spark2QueryTest
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.BeforeAndAfterAll
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index fef2da6..31c5b27 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -18,14 +18,16 @@
 package org.apache.carbondata.processing.loading.model;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 public class CarbonLoadModel implements Serializable {
 
@@ -785,4 +787,13 @@ public class CarbonLoadModel implements Serializable {
   public void setSkipEmptyLine(String skipEmptyLine) {
     this.skipEmptyLine = skipEmptyLine;
   }
+
+  /**
+   * Read segments metadata from table status file and set it to this load 
model object
+   */
+  public void readAndSetLoadMetadataDetails() {
+    String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
+    LoadMetadataDetails[] details = 
SegmentStatusManager.readLoadMetadata(metadataPath);
+    setLoadMetadataDetails(Arrays.asList(details));
+  }
 }

Reply via email to