[REBASE] Solve conflict after rebasing master
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6944dd42 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6944dd42 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6944dd42 Branch: refs/heads/carbonstore-rebase5 Commit: 6944dd420a07f6fe694a7fed92ed256bea68d314 Parents: 1921393 Author: Jacky Li <jacky.li...@qq.com> Authored: Thu Feb 1 00:25:31 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Fri Mar 2 15:51:53 2018 +0800 ---------------------------------------------------------------------- .../hadoop/util/CarbonInputFormatUtil.java | 20 +++++++++++++++++++ .../spark/rdd/NewCarbonDataLoadRDD.scala | 21 ++------------------ .../org/apache/spark/sql/CarbonSession.scala | 5 ++--- 3 files changed, 24 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6944dd42/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index 514428b..056c27b 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -22,6 +22,8 @@ import java.text.SimpleDateFormat; import java.util.List; import java.util.Locale; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; @@ -39,6 +41,7 @@ import org.apache.carbondata.core.scan.model.QueryMeasure; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; @@ -159,4 +162,21 @@ public class CarbonInputFormatUtil { String jobtrackerID = createJobTrackerID(date); return new JobID(jobtrackerID, batch); } + + public static void setS3Configurations(Configuration hadoopConf) { + FileFactory.getConfiguration() + .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", "")); + FileFactory.getConfiguration() + .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", "")); + FileFactory.getConfiguration() + .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", "")); + FileFactory.getConfiguration().set(CarbonCommonConstants.S3_ACCESS_KEY, + hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, "")); + FileFactory.getConfiguration().set(CarbonCommonConstants.S3_SECRET_KEY, + hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, "")); + FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_ACCESS_KEY, + hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, "")); + FileFactory.getConfiguration().set(CarbonCommonConstants.S3N_SECRET_KEY, + hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, "")); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6944dd42/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 917fc88..e17824f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -41,10 +41,10 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.common.logging.impl.StandardLogService import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.compression.CompressorFactory -import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo} import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations} import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator} import org.apache.carbondata.processing.loading.exception.NoRetryException @@ -371,7 +371,7 @@ class NewDataFrameLoaderRDD[K, V]( override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val hadoopConf = getConf - setS3Configurations(hadoopConf) + CarbonInputFormatUtil.setS3Configurations(hadoopConf) val iter = new Iterator[(K, V)] { val loadMetadataDetails = new LoadMetadataDetails() val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") @@ -441,23 +441,6 @@ class NewDataFrameLoaderRDD[K, V]( iter } override protected def getPartitions: Array[Partition] = firstParent[Row].partitions - - private def setS3Configurations(hadoopConf: Configuration): Unit = { - FileFactory.getConfiguration - .set("fs.s3a.access.key", hadoopConf.get("fs.s3a.access.key", "")) - FileFactory.getConfiguration - .set("fs.s3a.secret.key", hadoopConf.get("fs.s3a.secret.key", "")) - FileFactory.getConfiguration - .set("fs.s3a.endpoint", hadoopConf.get("fs.s3a.endpoint", "")) - FileFactory.getConfiguration.set(CarbonCommonConstants.S3_ACCESS_KEY, - hadoopConf.get(CarbonCommonConstants.S3_ACCESS_KEY, "")) - FileFactory.getConfiguration.set(CarbonCommonConstants.S3_SECRET_KEY, - hadoopConf.get(CarbonCommonConstants.S3_SECRET_KEY, "")) - FileFactory.getConfiguration.set(CarbonCommonConstants.S3N_ACCESS_KEY, - hadoopConf.get(CarbonCommonConstants.S3N_ACCESS_KEY, "")) - FileFactory.getConfiguration.set(CarbonCommonConstants.S3N_SECRET_KEY, - hadoopConf.get(CarbonCommonConstants.S3N_SECRET_KEY, "")) - } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/6944dd42/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index 935b0a6..bf958f8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -21,7 +21,6 @@ import java.io.File import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.SparkSession.Builder @@ -31,8 +30,8 @@ import org.apache.spark.sql.internal.{SessionState, SharedState} import org.apache.spark.util.{CarbonReflectionUtils, Utils} import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo} +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil /** * Session implementation for {org.apache.spark.sql.SparkSession} @@ -154,7 +153,7 @@ object CarbonSession { sparkConf.setAppName(randomAppName) } val sc = SparkContext.getOrCreate(sparkConf) - setS3Configurations(sc) + CarbonInputFormatUtil.setS3Configurations(sc.hadoopConfiguration) // maybe this is an existing SparkContext, update its SparkConf which maybe used // by SparkSession options.foreach { case (k, v) => sc.conf.set(k, v) }