[CARBONDATA-3106] WrittenbyAPI not serialized in executor with globalsort Problem: Written_By_APPNAME when added in carbonproperty is not serialized in executor with global sort
Solution: Add Written_by_APPNAME in hadoop conf and in executor side get it from configuration and add to carbonproperty This closes #2928 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2e0153bf Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2e0153bf Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2e0153bf Branch: refs/heads/branch-1.5 Commit: 2e0153bfa20b8d263402dbb67a8c020dd4a63ddd Parents: 6df965b Author: Indhumathi27 <indhumathi...@gmail.com> Authored: Fri Nov 16 21:49:16 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Wed Nov 21 22:43:46 2018 +0530 ---------------------------------------------------------------------- .../spark/load/DataLoadProcessBuilderOnSpark.scala | 5 ++--- .../spark/load/DataLoadProcessorStepOnSpark.scala | 6 +++++- .../store/writer/v3/CarbonFactDataWriterImplV3.java | 10 +++++++--- 3 files changed, 14 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e0153bf/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index 338180d..8ded6bd 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -66,9 +66,8 @@ object DataLoadProcessBuilderOnSpark { val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator") val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator") - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, - sparkSession.sparkContext.appName) + hadoopConf + .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName) val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf) // 1. Input http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e0153bf/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 0a68fb0..2ca47b3 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -26,9 +26,10 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException import org.apache.carbondata.core.datastore.row.CarbonRow -import org.apache.carbondata.core.util.ThreadLocalSessionInfo +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations} import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException @@ -228,6 +229,9 @@ object DataLoadProcessorStepOnSpark { modelBroadcast: Broadcast[CarbonLoadModel], rowCounter: Accumulator[Int], conf: Configuration) { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, + conf.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME)) ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf) var model: CarbonLoadModel = null var tableName: String = null http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e0153bf/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java index f168796..ccbc544 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java @@ -104,9 +104,13 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter { .convertFileFooterVersion3(blockletMetadata, blockletIndex, localCardinality, thriftColumnSchemaList.size()); convertFileMeta.setIs_sort(isSorted); - convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO, - CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME)); + String appName = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME); + if (appName == null) { + throw new CarbonDataWriterException( + "DataLoading failed as CARBON_WRITTEN_BY_APPNAME is null"); + } + convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO, appName); convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_VERSION, CarbonVersionConstants.CARBONDATA_VERSION); // fill the carbon index details