svn commit: r26496 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_24_20_01-5fea17b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Apr 25 03:15:17 2018 New Revision: 26496 Log: Apache Spark 2.4.0-SNAPSHOT-2018_04_24_20_01-5fea17b docs [This commit notification would consist of 1460 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23821][SQL] Collection function: flatten
Repository: spark Updated Branches: refs/heads/master d6c26d1c9 -> 5fea17b3b [SPARK-23821][SQL] Collection function: flatten ## What changes were proposed in this pull request? This PR adds a new collection function that transforms an array of arrays into a single array. The PR comprises: - An expression for flattening array structure - Flatten function - A wrapper for PySpark ## How was this patch tested? New tests added into: - CollectionExpressionsSuite - DataFrameFunctionsSuite ## Codegen examples ### Primitive type ``` val df = Seq( Seq(Seq(1, 2), Seq(4, 5)), Seq(null, Seq(1)) ).toDF("i") df.filter($"i".isNotNull || $"i".isNull).select(flatten($"i")).debugCodegen ``` Result: ``` /* 033 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 034 */ ArrayData inputadapter_value = inputadapter_isNull ? /* 035 */ null : (inputadapter_row.getArray(0)); /* 036 */ /* 037 */ boolean filter_value = true; /* 038 */ /* 039 */ if (!(!inputadapter_isNull)) { /* 040 */ filter_value = inputadapter_isNull; /* 041 */ } /* 042 */ if (!filter_value) continue; /* 043 */ /* 044 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1); /* 045 */ /* 046 */ boolean project_isNull = inputadapter_isNull; /* 047 */ ArrayData project_value = null; /* 048 */ /* 049 */ if (!inputadapter_isNull) { /* 050 */ for (int z = 0; !project_isNull && z < inputadapter_value.numElements(); z++) { /* 051 */ project_isNull |= inputadapter_value.isNullAt(z); /* 052 */ } /* 053 */ if (!project_isNull) { /* 054 */ long project_numElements = 0; /* 055 */ for (int z = 0; z < inputadapter_value.numElements(); z++) { /* 056 */ project_numElements += inputadapter_value.getArray(z).numElements(); /* 057 */ } /* 058 */ if (project_numElements > 2147483632) { /* 059 */ throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " + /* 060 */ project_numElements + " elements due to exceeding the array size limit 2147483632."); /* 061 */ } /* 062 */ /* 063 */ long project_size = UnsafeArrayData.calculateSizeOfUnderlyingByteArray( /* 064 */ project_numElements, /* 065 */ 4); /* 066 */ if (project_size > 2147483632) { /* 067 */ throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " + /* 068 */ project_size + " bytes of data due to exceeding the limit 2147483632" + /* 069 */ " bytes for UnsafeArrayData."); /* 070 */ } /* 071 */ /* 072 */ byte[] project_array = new byte[(int)project_size]; /* 073 */ UnsafeArrayData project_tempArrayData = new UnsafeArrayData(); /* 074 */ Platform.putLong(project_array, 16, project_numElements); /* 075 */ project_tempArrayData.pointTo(project_array, 16, (int)project_size); /* 076 */ int project_counter = 0; /* 077 */ for (int k = 0; k < inputadapter_value.numElements(); k++) { /* 078 */ ArrayData arr = inputadapter_value.getArray(k); /* 079 */ for (int l = 0; l < arr.numElements(); l++) { /* 080 */ if (arr.isNullAt(l)) { /* 081 */ project_tempArrayData.setNullAt(project_counter); /* 082 */ } else { /* 083 */ project_tempArrayData.setInt( /* 084 */ project_counter, /* 085 */ arr.getInt(l) /* 086 */ ); /* 087 */ } /* 088 */ project_counter++; /* 089 */ } /* 090 */ } /* 091 */ project_value = project_tempArrayData; /* 092 */ /* 093 */ } /* 094 */ /* 095 */ } ``` ### Non-primitive type ``` val df = Seq( Seq(Seq("a", "b"), Seq(null, "d")), Seq(null, Seq("a")) ).toDF("s") df.filter($"s".isNotNull || $"s".isNull).select(flatten($"s")).debugCodegen ``` Result: ``` /* 033 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0); /* 034 */ ArrayData inputadapter_value = inputadapter_isNull ? /* 035 */ null : (inputadapter_row.getArray(0)); /* 036 */ /* 037 */ boolean filter_value = true; /* 038 */ /* 039 */ if (!(!inputadapter_isNull)) { /* 040 */ filter_value = inputadapter_isNull; /* 041 */ } /* 042 */ if (!filter_value) continue; /* 043 */ /* 044 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1); /* 045 */ /* 046 */ boolean project_isNull = inputadapter_isNull; /* 047 */ ArrayData project_value = null; /* 048 */ /* 049 */ if (!inputadapter_isNull) { /* 050 */ for (int z = 0; !project_isNull && z <
spark git commit: [SPARK-24038][SS] Refactor continuous writing to its own class
Repository: spark Updated Branches: refs/heads/master 7b1e6523a -> d6c26d1c9 [SPARK-24038][SS] Refactor continuous writing to its own class ## What changes were proposed in this pull request? Refactor continuous writing to its own class. See WIP https://github.com/jose-torres/spark/pull/13 for the overall direction this is going, but I think this PR is very isolated and necessary anyway. ## How was this patch tested? existing unit tests - refactoring only Author: Jose TorresCloses #21116 from jose-torres/SPARK-24038. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6c26d1c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6c26d1c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6c26d1c Branch: refs/heads/master Commit: d6c26d1c9a8f747a3e0d281a27ea9eb4d92102e5 Parents: 7b1e652 Author: Jose Torres Authored: Tue Apr 24 17:06:03 2018 -0700 Committer: Tathagata Das Committed: Tue Apr 24 17:06:03 2018 -0700 -- .../datasources/v2/DataSourceV2Strategy.scala | 4 + .../datasources/v2/WriteToDataSourceV2.scala| 74 +-- .../continuous/ContinuousExecution.scala| 2 +- .../WriteToContinuousDataSource.scala | 31 + .../WriteToContinuousDataSourceExec.scala | 124 +++ 5 files changed, 165 insertions(+), 70 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d6c26d1c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 1ac9572..c2a3144 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec} object DataSourceV2Strategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { @@ -32,6 +33,9 @@ object DataSourceV2Strategy extends Strategy { case WriteToDataSourceV2(writer, query) => WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil +case WriteToContinuousDataSource(writer, query) => + WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil + case _ => Nil } } http://git-wip-us.apache.org/repos/asf/spark/blob/d6c26d1c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala index e80b44c..ea283ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala @@ -65,25 +65,10 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e s"The input RDD has ${messages.length} partitions.") try { - val runTask = writer match { -// This case means that we're doing continuous processing. In microbatch streaming, the -// StreamWriter is wrapped in a MicroBatchWriter, which is executed as a normal batch. -case w: StreamWriter => - EpochCoordinatorRef.get( - sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), -sparkContext.env) -.askSync[Unit](SetWriterPartitions(rdd.getNumPartitions)) - - (context: TaskContext, iter: Iterator[InternalRow]) => -DataWritingSparkTask.runContinuous(writeTask, context, iter) -case _ => - (context: TaskContext, iter: Iterator[InternalRow]) => -DataWritingSparkTask.run(writeTask, context, iter, useCommitCoordinator) - } - sparkContext.runJob( rdd, -runTask, +(context: TaskContext, iter: Iterator[InternalRow]) => + DataWritingSparkTask.run(writeTask, context, iter, useCommitCoordinator),
svn commit: r26492 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_24_16_01-7b1e652-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Apr 24 23:15:37 2018 New Revision: 26492 Log: Apache Spark 2.4.0-SNAPSHOT-2018_04_24_16_01-7b1e652 docs [This commit notification would consist of 1460 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24056][SS] Make consumer creation lazy in Kafka source for Structured streaming
Repository: spark Updated Branches: refs/heads/master 379bffa05 -> 7b1e6523a [SPARK-24056][SS] Make consumer creation lazy in Kafka source for Structured streaming ## What changes were proposed in this pull request? Currently, the driver side of the Kafka source (i.e. KafkaMicroBatchReader) eagerly creates a consumer as soon as the Kafk aMicroBatchReader is created. However, we create dummy KafkaMicroBatchReader to get the schema and immediately stop it. Its better to make the consumer creation lazy, it will be created on the first attempt to fetch offsets using the KafkaOffsetReader. ## How was this patch tested? Existing unit tests Author: Tathagata DasCloses #21134 from tdas/SPARK-24056. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b1e6523 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b1e6523 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b1e6523 Branch: refs/heads/master Commit: 7b1e6523af3c96043aa8d2763e5f18b6e2781c3d Parents: 379bffa Author: Tathagata Das Authored: Tue Apr 24 14:33:33 2018 -0700 Committer: Tathagata Das Committed: Tue Apr 24 14:33:33 2018 -0700 -- .../spark/sql/kafka010/KafkaOffsetReader.scala | 31 +++- 1 file changed, 17 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7b1e6523/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 551641c..8206669 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -75,7 +75,17 @@ private[kafka010] class KafkaOffsetReader( * A KafkaConsumer used in the driver to query the latest Kafka offsets. This only queries the * offsets and never commits them. */ - protected var consumer = createConsumer() + @volatile protected var _consumer: Consumer[Array[Byte], Array[Byte]] = null + + protected def consumer: Consumer[Array[Byte], Array[Byte]] = synchronized { +assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) +if (_consumer == null) { + val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams) + newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId()) + _consumer = consumerStrategy.createConsumer(newKafkaParams) +} +_consumer + } private val maxOffsetFetchAttempts = readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt @@ -95,9 +105,7 @@ private[kafka010] class KafkaOffsetReader( * Closes the connection to Kafka, and cleans up state. */ def close(): Unit = { -runUninterruptibly { - consumer.close() -} +if (_consumer != null) runUninterruptibly { stopConsumer() } kafkaReaderThread.shutdown() } @@ -304,19 +312,14 @@ private[kafka010] class KafkaOffsetReader( } } - /** - * Create a consumer using the new generated group id. We always use a new consumer to avoid - * just using a broken consumer to retry on Kafka errors, which likely will fail again. - */ - private def createConsumer(): Consumer[Array[Byte], Array[Byte]] = synchronized { -val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams) -newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId()) -consumerStrategy.createConsumer(newKafkaParams) + private def stopConsumer(): Unit = synchronized { +assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) +if (_consumer != null) _consumer.close() } private def resetConsumer(): Unit = synchronized { -consumer.close() -consumer = createConsumer() +stopConsumer() +_consumer = null // will automatically get reinitialized again } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26486 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_24_12_01-379bffa-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Apr 24 19:15:19 2018 New Revision: 26486 Log: Apache Spark 2.4.0-SNAPSHOT-2018_04_24_12_01-379bffa docs [This commit notification would consist of 1460 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23990][ML] Instruments logging improvements - ML regression package
Repository: spark Updated Branches: refs/heads/master 83013752e -> 379bffa05 [SPARK-23990][ML] Instruments logging improvements - ML regression package ## What changes were proposed in this pull request? Instruments logging improvements - ML regression package I add an `OptionalInstrument` class which used in `WeightLeastSquares` and `IterativelyReweightedLeastSquares`. ## How was this patch tested? N/A Author: WeichenXuCloses #21078 from WeichenXu123/inst_reg. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/379bffa0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/379bffa0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/379bffa0 Branch: refs/heads/master Commit: 379bffa0525a4343f8c10e51ed192031922f9874 Parents: 8301375 Author: WeichenXu Authored: Tue Apr 24 11:02:22 2018 -0700 Committer: Joseph K. Bradley Committed: Tue Apr 24 11:02:22 2018 -0700 -- .../ml/classification/LogisticRegression.scala | 4 +- .../IterativelyReweightedLeastSquares.scala | 18 -- .../spark/ml/optim/WeightedLeastSquares.scala | 32 + .../ml/regression/AFTSurvivalRegression.scala | 2 +- .../GeneralizedLinearRegression.scala | 14 ++-- .../spark/ml/regression/LinearRegression.scala | 22 --- .../spark/ml/tree/impl/RandomForest.scala | 2 + .../apache/spark/ml/util/Instrumentation.scala | 68 +++- 8 files changed, 125 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/379bffa0/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index e426263..06ca37b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -500,7 +500,7 @@ class LogisticRegression @Since("1.2.0") ( if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) -val instr = Instrumentation.create(this, instances) +val instr = Instrumentation.create(this, dataset) instr.logParams(regParam, elasticNetParam, standardization, threshold, maxIter, tol, fitIntercept) @@ -816,7 +816,7 @@ class LogisticRegression @Since("1.2.0") ( if (state == null) { val msg = s"${optimizer.getClass.getName} failed." - logError(msg) + instr.logError(msg) throw new SparkException(msg) } http://git-wip-us.apache.org/repos/asf/spark/blob/379bffa0/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala index 6961b45..572b8cf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala @@ -17,9 +17,9 @@ package org.apache.spark.ml.optim -import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.{Instance, OffsetInstance} import org.apache.spark.ml.linalg._ +import org.apache.spark.ml.util.OptionalInstrumentation import org.apache.spark.rdd.RDD /** @@ -61,9 +61,12 @@ private[ml] class IterativelyReweightedLeastSquares( val fitIntercept: Boolean, val regParam: Double, val maxIter: Int, -val tol: Double) extends Logging with Serializable { +val tol: Double) extends Serializable { - def fit(instances: RDD[OffsetInstance]): IterativelyReweightedLeastSquaresModel = { + def fit( + instances: RDD[OffsetInstance], + instr: OptionalInstrumentation = OptionalInstrumentation.create( +classOf[IterativelyReweightedLeastSquares])): IterativelyReweightedLeastSquaresModel = { var converged = false var iter = 0 @@ -83,7 +86,8 @@ private[ml] class IterativelyReweightedLeastSquares( // Estimate new model model = new WeightedLeastSquares(fitIntercept, regParam, elasticNetParam = 0.0, -standardizeFeatures = false, standardizeLabel = false).fit(newInstances) +standardizeFeatures = false, standardizeLabel = false) +.fit(newInstances, instr = instr) // Check convergence val oldCoefficients = oldModel.coefficients @@ -96,14 +100,14 @@ private[ml]
spark git commit: [SPARK-23455][ML] Default Params in ML should be saved separately in metadata
Repository: spark Updated Branches: refs/heads/master ce7ba2e98 -> 83013752e [SPARK-23455][ML] Default Params in ML should be saved separately in metadata ## What changes were proposed in this pull request? We save ML's user-supplied params and default params as one entity in metadata. During loading the saved models, we set all the loaded params into created ML model instances as user-supplied params. It causes some problems, e.g., if we strictly disallow some params to be set at the same time, a default param can fail the param check because it is treated as user-supplied param after loading. The loaded default params should not be set as user-supplied params. We should save ML default params separately in metadata. For backward compatibility, when loading metadata, if it is a metadata file from previous Spark, we shouldn't raise error if we can't find the default param field. ## How was this patch tested? Pass existing tests and added tests. Author: Liang-Chi HsiehCloses #20633 from viirya/save-ml-default-params. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83013752 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83013752 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83013752 Branch: refs/heads/master Commit: 83013752e3cfcbc3edeef249439ac20b143eeabc Parents: ce7ba2e Author: Liang-Chi Hsieh Authored: Tue Apr 24 10:40:25 2018 -0700 Committer: Joseph K. Bradley Committed: Tue Apr 24 10:40:25 2018 -0700 -- .../classification/DecisionTreeClassifier.scala | 2 +- .../spark/ml/classification/GBTClassifier.scala | 4 +- .../spark/ml/classification/LinearSVC.scala | 2 +- .../ml/classification/LogisticRegression.scala | 2 +- .../MultilayerPerceptronClassifier.scala| 2 +- .../spark/ml/classification/NaiveBayes.scala| 2 +- .../spark/ml/classification/OneVsRest.scala | 4 +- .../classification/RandomForestClassifier.scala | 4 +- .../spark/ml/clustering/BisectingKMeans.scala | 2 +- .../spark/ml/clustering/GaussianMixture.scala | 2 +- .../org/apache/spark/ml/clustering/KMeans.scala | 2 +- .../org/apache/spark/ml/clustering/LDA.scala| 4 +- .../feature/BucketedRandomProjectionLSH.scala | 2 +- .../apache/spark/ml/feature/Bucketizer.scala| 24 .../apache/spark/ml/feature/ChiSqSelector.scala | 2 +- .../spark/ml/feature/CountVectorizer.scala | 2 +- .../scala/org/apache/spark/ml/feature/IDF.scala | 2 +- .../org/apache/spark/ml/feature/Imputer.scala | 2 +- .../apache/spark/ml/feature/MaxAbsScaler.scala | 2 +- .../apache/spark/ml/feature/MinHashLSH.scala| 2 +- .../apache/spark/ml/feature/MinMaxScaler.scala | 2 +- .../ml/feature/OneHotEncoderEstimator.scala | 2 +- .../scala/org/apache/spark/ml/feature/PCA.scala | 2 +- .../spark/ml/feature/QuantileDiscretizer.scala | 24 .../org/apache/spark/ml/feature/RFormula.scala | 6 +- .../spark/ml/feature/StandardScaler.scala | 2 +- .../apache/spark/ml/feature/StringIndexer.scala | 2 +- .../apache/spark/ml/feature/VectorIndexer.scala | 2 +- .../org/apache/spark/ml/feature/Word2Vec.scala | 2 +- .../org/apache/spark/ml/fpm/FPGrowth.scala | 2 +- .../org/apache/spark/ml/param/params.scala | 13 +- .../apache/spark/ml/recommendation/ALS.scala| 2 +- .../ml/regression/AFTSurvivalRegression.scala | 2 +- .../ml/regression/DecisionTreeRegressor.scala | 2 +- .../spark/ml/regression/GBTRegressor.scala | 4 +- .../GeneralizedLinearRegression.scala | 2 +- .../ml/regression/IsotonicRegression.scala | 2 +- .../spark/ml/regression/LinearRegression.scala | 2 +- .../ml/regression/RandomForestRegressor.scala | 4 +- .../apache/spark/ml/tuning/CrossValidator.scala | 6 +- .../spark/ml/tuning/TrainValidationSplit.scala | 6 +- .../org/apache/spark/ml/util/ReadWrite.scala| 130 --- .../spark/ml/util/DefaultReadWriteTest.scala| 73 ++- project/MimaExcludes.scala | 6 + 44 files changed, 223 insertions(+), 147 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/83013752/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala index 771cd4f..57797d1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala +++
spark git commit: [SPARK-23807][BUILD] Add Hadoop 3.1 profile with relevant POM fix ups
Repository: spark Updated Branches: refs/heads/master 2a24c481d -> ce7ba2e98 [SPARK-23807][BUILD] Add Hadoop 3.1 profile with relevant POM fix ups ## What changes were proposed in this pull request? 1. Adds a `hadoop-3.1` profile build depending on the hadoop-3.1 artifacts. 1. In the hadoop-cloud module, adds an explicit hadoop-3.1 profile which switches from explicitly pulling in cloud connectors (hadoop-openstack, hadoop-aws, hadoop-azure) to depending on the hadoop-cloudstorage POM artifact, which pulls these in, has pre-excluded things like hadoop-common, and stays up to date with new connectors (hadoop-azuredatalake, hadoop-allyun). Goal: it becomes the Hadoop projects homework of keeping this clean, and the spark project doesn't need to handle new hadoop releases adding more dependencies. 1. the hadoop-cloud/hadoop-3.1 profile also declares support for jetty-ajax and jetty-util to ensure that these jars get into the distribution jar directory when needed by unshaded libraries. 1. Increases the curator and zookeeper versions to match those in hadoop-3, fixing spark core to build in sbt with the hadoop-3 dependencies. ## How was this patch tested? * Everything this has been built and tested against both ASF Hadoop branch-3.1 and hadoop trunk. * spark-shell was used to create connectors to all the stores and verify that file IO could take place. The spark hive-1.2.1 JAR has problems here, as it's version check logic fails for Hadoop versions > 2. This can be avoided with either of * The hadoop JARs built to declare their version as Hadoop 2.11 `mvn install -DskipTests -DskipShade -Ddeclared.hadoop.version=2.11` . This is safe for local test runs, not for deployment (HDFS is very strict about cross-version deployment). * A modified version of spark hive whose version check switch statement is happy with hadoop 3. I've done both, with maven and SBT. Three issues surfaced 1. A spark-core test failure âfixed in SPARK-23787. 1. SBT only: Zookeeper not being found in spark-core. Somehow curator 2.12.0 triggers some slightly different dependency resolution logic from previous versions, and Ivy was missing zookeeper.jar entirely. This patch adds the explicit declaration for all spark profiles, setting the ZK version = 3.4.9 for hadoop-3.1 1. Marking jetty-utils as provided in spark was stopping hadoop-azure from being able to instantiate the azure wasb:// client; it was using jetty-util-ajax, which could then not find a class in jetty-util. Author: Steve LoughranCloses #20923 from steveloughran/cloud/SPARK-23807-hadoop-31. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce7ba2e9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce7ba2e9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce7ba2e9 Branch: refs/heads/master Commit: ce7ba2e98e0a3b038e881c271b5905058c43155b Parents: 2a24c48 Author: Steve Loughran Authored: Tue Apr 24 09:57:09 2018 -0700 Committer: Marcelo Vanzin Committed: Tue Apr 24 09:57:09 2018 -0700 -- assembly/pom.xml | 8 ++ core/pom.xml | 6 + dev/deps/spark-deps-hadoop-3.1 | 221 dev/test-dependencies.sh | 1 + hadoop-cloud/pom.xml | 83 +- pom.xml| 9 ++ 6 files changed, 327 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ce7ba2e9/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index a207dae..9608c96 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -254,6 +254,14 @@ spark-hadoop-cloud_${scala.binary.version} ${project.version} + + + org.eclipse.jetty + jetty-util + ${hadoop.deps.scope} + http://git-wip-us.apache.org/repos/asf/spark/blob/ce7ba2e9/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 9258a85..093a986 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -95,6 +95,12 @@ org.apache.curator curator-recipes + + + org.apache.zookeeper + zookeeper + http://git-wip-us.apache.org/repos/asf/spark/blob/ce7ba2e9/dev/deps/spark-deps-hadoop-3.1 -- diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1 new file mode 100644 index 000..97ad65a --- /dev/null +++ b/dev/deps/spark-deps-hadoop-3.1 @@ -0,0 +1,221 @@ +HikariCP-java7-2.4.12.jar +JavaEWAH-0.3.2.jar +RoaringBitmap-0.5.11.jar
spark git commit: [SPARK-23975][ML] Allow Clustering to take Arrays of Double as input features
Repository: spark Updated Branches: refs/heads/master 55c4ca88a -> 2a24c481d [SPARK-23975][ML] Allow Clustering to take Arrays of Double as input features ## What changes were proposed in this pull request? - Multiple possible input types is added in validateAndTransformSchema() and computeCost() while checking column type - Add if statement in transform() to support array type as featuresCol - Add the case statement in fit() while selecting columns from dataset These changes will be applied to KMeans first, then to other clustering method ## How was this patch tested? unit test is added Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Lu WANGCloses #21081 from ludatabricks/SPARK-23975. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a24c481 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a24c481 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a24c481 Branch: refs/heads/master Commit: 2a24c481da3f30b510deb62e5cf21c9463cf250c Parents: 55c4ca8 Author: Lu WANG Authored: Tue Apr 24 09:25:41 2018 -0700 Committer: Joseph K. Bradley Committed: Tue Apr 24 09:25:41 2018 -0700 -- .../org/apache/spark/ml/clustering/KMeans.scala | 32 +++--- .../org/apache/spark/ml/util/DatasetUtils.scala | 63 .../spark/ml/clustering/KMeansSuite.scala | 38 3 files changed, 126 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2a24c481/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 1ad157a..d475c72 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -33,8 +33,8 @@ import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} -import org.apache.spark.sql.functions.{col, udf} -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.functions.udf +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, IntegerType, StructType} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.VersionUtils.majorVersion @@ -87,12 +87,23 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe def getInitSteps: Int = $(initSteps) /** + * Validates the input schema. + * @param schema input schema + */ + private[clustering] def validateSchema(schema: StructType): Unit = { +val typeCandidates = List( new VectorUDT, + new ArrayType(DoubleType, false), + new ArrayType(FloatType, false)) + +SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates) + } + /** * Validates and transforms the input schema. * @param schema input schema * @return output schema */ protected def validateAndTransformSchema(schema: StructType): StructType = { -SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT) +validateSchema(schema) SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) } } @@ -125,8 +136,11 @@ class KMeansModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) + val predictUDF = udf((vector: Vector) => predict(vector)) -dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol + +dataset.withColumn($(predictionCol), + predictUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol))) } @Since("1.5.0") @@ -146,8 +160,10 @@ class KMeansModel private[ml] ( // TODO: Replace the temp fix when we have proper evaluators defined for clustering. @Since("2.0.0") def computeCost(dataset: Dataset[_]): Double = { -SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) -val data: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { +validateSchema(dataset.schema) + +val data: RDD[OldVector] = dataset.select(DatasetUtils.columnToVector(dataset, getFeaturesCol)) + .rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } parentModel.computeCost(data) @@ -335,7 +351,9 @@ class KMeans @Since("1.5.0") ( transformSchema(dataset.schema, logging = true) val handlePersistence = dataset.storageLevel ==
spark git commit: [SPARK-22683][CORE] Add a executorAllocationRatio parameter to throttle the parallelism of the dynamic allocation
Repository: spark Updated Branches: refs/heads/master 4926a7c2f -> 55c4ca88a [SPARK-22683][CORE] Add a executorAllocationRatio parameter to throttle the parallelism of the dynamic allocation ## What changes were proposed in this pull request? By default, the dynamic allocation will request enough executors to maximize the parallelism according to the number of tasks to process. While this minimizes the latency of the job, with small tasks this setting can waste a lot of resources due to executor allocation overhead, as some executor might not even do any work. This setting allows to set a ratio that will be used to reduce the number of target executors w.r.t. full parallelism. The number of executors computed with this setting is still fenced by `spark.dynamicAllocation.maxExecutors` and `spark.dynamicAllocation.minExecutors` ## How was this patch tested? Units tests and runs on various actual workloads on a Yarn Cluster Author: Julien CuquemelleCloses #19881 from jcuquemelle/AddTaskPerExecutorSlot. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55c4ca88 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55c4ca88 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55c4ca88 Branch: refs/heads/master Commit: 55c4ca88a3b093ee197a8689631be8d1fac1f10f Parents: 4926a7c Author: Julien Cuquemelle Authored: Tue Apr 24 10:56:55 2018 -0500 Committer: Thomas Graves Committed: Tue Apr 24 10:56:55 2018 -0500 -- .../spark/ExecutorAllocationManager.scala | 24 +++--- .../apache/spark/internal/config/package.scala | 4 +++ .../spark/ExecutorAllocationManagerSuite.scala | 33 docs/configuration.md | 18 +++ 4 files changed, 74 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/55c4ca88/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 189d913..aa363ee 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -26,7 +26,7 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS} +import org.apache.spark.internal.config._ import org.apache.spark.metrics.source.Source import org.apache.spark.scheduler._ import org.apache.spark.storage.BlockManagerMaster @@ -69,6 +69,10 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} * spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors * spark.dynamicAllocation.initialExecutors - Number of executors to start with * + * spark.dynamicAllocation.executorAllocationRatio - + * This is used to reduce the parallelism of the dynamic allocation that can waste + * resources when tasks are small + * * spark.dynamicAllocation.schedulerBacklogTimeout (M) - * If there are backlogged tasks for this duration, add new executors * @@ -116,9 +120,12 @@ private[spark] class ExecutorAllocationManager( // TODO: The default value of 1 for spark.executor.cores works right now because dynamic // allocation is only supported for YARN and the default number of cores per executor in YARN is // 1, but it might need to be attained differently for different cluster managers - private val tasksPerExecutor = + private val tasksPerExecutorForFullParallelism = conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1) + private val executorAllocationRatio = +conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO) + validateSettings() // Number of executors to add in the next round @@ -209,8 +216,13 @@ private[spark] class ExecutorAllocationManager( throw new SparkException("Dynamic allocation of executors requires the external " + "shuffle service. You may enable this through spark.shuffle.service.enabled.") } -if (tasksPerExecutor == 0) { - throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.") +if (tasksPerExecutorForFullParallelism == 0) { + throw new SparkException("spark.executor.cores must not be < spark.task.cpus.") +} + +if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) { + throw new SparkException( +
spark git commit: [SPARK-23589][SQL][FOLLOW-UP] Reuse InternalRow in ExternalMapToCatalyst eval
Repository: spark Updated Branches: refs/heads/master 87e8a572b -> 4926a7c2f [SPARK-23589][SQL][FOLLOW-UP] Reuse InternalRow in ExternalMapToCatalyst eval ## What changes were proposed in this pull request? This pr is a follow-up of #20980 and fixes code to reuse `InternalRow` for converting input keys/values in `ExternalMapToCatalyst` eval. ## How was this patch tested? Existing tests. Author: Takeshi YamamuroCloses #21137 from maropu/SPARK-23589-FOLLOWUP. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4926a7c2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4926a7c2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4926a7c2 Branch: refs/heads/master Commit: 4926a7c2f0a47b562f99dbb4f1ca17adb3192061 Parents: 87e8a57 Author: Takeshi Yamamuro Authored: Tue Apr 24 17:52:05 2018 +0200 Committer: Herman van Hovell Committed: Tue Apr 24 17:52:05 2018 +0200 -- .../catalyst/expressions/objects/objects.scala | 92 +++- 1 file changed, 50 insertions(+), 42 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4926a7c2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 9c7e764..f974fd8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -1255,53 +1255,61 @@ case class ExternalMapToCatalyst private( override def dataType: MapType = MapType( keyConverter.dataType, valueConverter.dataType, valueContainsNull = valueConverter.nullable) - private lazy val mapCatalystConverter: Any => (Array[Any], Array[Any]) = child.dataType match { -case ObjectType(cls) if classOf[java.util.Map[_, _]].isAssignableFrom(cls) => - (input: Any) => { -val data = input.asInstanceOf[java.util.Map[Any, Any]] -val keys = new Array[Any](data.size) -val values = new Array[Any](data.size) -val iter = data.entrySet().iterator() -var i = 0 -while (iter.hasNext) { - val entry = iter.next() - val (key, value) = (entry.getKey, entry.getValue) - keys(i) = if (key != null) { -keyConverter.eval(InternalRow.fromSeq(key :: Nil)) - } else { -throw new RuntimeException("Cannot use null as map key!") - } - values(i) = if (value != null) { -valueConverter.eval(InternalRow.fromSeq(value :: Nil)) - } else { -null + private lazy val mapCatalystConverter: Any => (Array[Any], Array[Any]) = { +val rowBuffer = InternalRow.fromSeq(Array[Any](1)) +def rowWrapper(data: Any): InternalRow = { + rowBuffer.update(0, data) + rowBuffer +} + +child.dataType match { + case ObjectType(cls) if classOf[java.util.Map[_, _]].isAssignableFrom(cls) => +(input: Any) => { + val data = input.asInstanceOf[java.util.Map[Any, Any]] + val keys = new Array[Any](data.size) + val values = new Array[Any](data.size) + val iter = data.entrySet().iterator() + var i = 0 + while (iter.hasNext) { +val entry = iter.next() +val (key, value) = (entry.getKey, entry.getValue) +keys(i) = if (key != null) { + keyConverter.eval(rowWrapper(key)) +} else { + throw new RuntimeException("Cannot use null as map key!") +} +values(i) = if (value != null) { + valueConverter.eval(rowWrapper(value)) +} else { + null +} +i += 1 } - i += 1 + (keys, values) } -(keys, values) - } -case ObjectType(cls) if classOf[scala.collection.Map[_, _]].isAssignableFrom(cls) => - (input: Any) => { -val data = input.asInstanceOf[scala.collection.Map[Any, Any]] -val keys = new Array[Any](data.size) -val values = new Array[Any](data.size) -var i = 0 -for ((key, value) <- data) { - keys(i) = if (key != null) { -keyConverter.eval(InternalRow.fromSeq(key :: Nil)) - } else { -throw new RuntimeException("Cannot use null as map key!") - } - values(i) = if (value != null) { -valueConverter.eval(InternalRow.fromSeq(value :: Nil)) - } else {
spark git commit: [SPARK-24054][R] Add array_position function / element_at functions
Repository: spark Updated Branches: refs/heads/master c303b1b67 -> 87e8a572b [SPARK-24054][R] Add array_position function / element_at functions ## What changes were proposed in this pull request? This PR proposes to add array_position and element_at in R side too. array_position: ```r df <- createDataFrame(cbind(model = rownames(mtcars), mtcars)) mutated <- mutate(df, v1 = create_array(df$gear, df$am, df$carb)) head(select(mutated, array_position(mutated$v1, 1))) ``` ``` array_position(v1, 1.0) 1 2 2 2 3 2 4 3 5 0 6 3 ``` element_at: ```r df <- createDataFrame(cbind(model = rownames(mtcars), mtcars)) mutated <- mutate(df, v1 = create_array(df$mpg, df$cyl, df$hp)) head(select(mutated, element_at(mutated$v1, 1))) ``` ``` element_at(v1, 1.0) 121.0 221.0 322.8 421.4 518.7 618.1 ``` ```r df <- createDataFrame(cbind(model = rownames(mtcars), mtcars)) mutated <- mutate(df, v1 = create_map(df$model, df$cyl)) head(select(mutated, element_at(mutated$v1, "Valiant"))) ``` ``` element_at(v3, Valiant) 1 NA 2 NA 3 NA 4 NA 5 NA 6 6 ``` ## How was this patch tested? Unit tests were added in `R/pkg/tests/fulltests/test_sparkSQL.R` and manually tested. Documentation was manually built and verified. Author: hyukjinkwonCloses #21130 from HyukjinKwon/sparkr_array_position_element_at. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/87e8a572 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/87e8a572 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/87e8a572 Branch: refs/heads/master Commit: 87e8a572be14381da9081365d9aa2cbf3253a32c Parents: c303b1b Author: hyukjinkwon Authored: Tue Apr 24 16:18:20 2018 +0800 Committer: hyukjinkwon Committed: Tue Apr 24 16:18:20 2018 +0800 -- R/pkg/NAMESPACE | 2 ++ R/pkg/R/functions.R | 42 -- R/pkg/R/generics.R| 8 ++ R/pkg/tests/fulltests/test_sparkSQL.R | 13 +++-- 4 files changed, 61 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/87e8a572/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 190c50e..55dec17 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -201,6 +201,7 @@ exportMethods("%<=>%", "approxCountDistinct", "approxQuantile", "array_contains", + "array_position", "asc", "ascii", "asin", @@ -245,6 +246,7 @@ exportMethods("%<=>%", "decode", "dense_rank", "desc", + "element_at", "encode", "endsWith", "exp", http://git-wip-us.apache.org/repos/asf/spark/blob/87e8a572/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index a527426..7b3aa05 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -189,6 +189,11 @@ NULL #' the map or array of maps. #' \item \code{from_json}: it is the column containing the JSON string. #' } +#' @param value A value to compute on. +#' \itemize{ +#' \item \code{array_contains}: a value to be checked if contained in the column. +#' \item \code{array_position}: a value to locate in the given array. +#' } #' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains #'additional named properties to control how it is converted, accepts the same #'options as the JSON data source. @@ -201,6 +206,7 @@ NULL #' df <- createDataFrame(cbind(model = rownames(mtcars), mtcars)) #' tmp <- mutate(df, v1 = create_array(df$mpg, df$cyl, df$hp)) #' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1))) +#' head(select(tmp, array_position(tmp$v1, 21))) #' tmp2 <- mutate(tmp, v2 = explode(tmp$v1)) #' head(tmp2) #' head(select(tmp, posexplode(tmp$v1))) @@ -208,7 +214,8 @@ NULL #' head(select(tmp, sort_array(tmp$v1, asc = FALSE))) #' tmp3 <- mutate(df, v3 = create_map(df$model, df$cyl)) #' head(select(tmp3, map_keys(tmp3$v3))) -#' head(select(tmp3, map_values(tmp3$v3)))} +#' head(select(tmp3, map_values(tmp3$v3))) +#' head(select(tmp3,
spark git commit: [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId
Repository: spark Updated Branches: refs/heads/branch-2.2 041aec4e1 -> e77d62a72 [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId ## What changes were proposed in this pull request? Fix comment. Change `BroadcastHashJoin.broadcastFuture` to `BroadcastExchangeExec.relationFuture`: https://github.com/apache/spark/blob/d28d5732ae205771f1f443b15b10e64dcffb5ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L66 ## How was this patch tested? N/A Author: seancxmaoCloses #21113 from seancxmao/SPARK-13136. (cherry picked from commit c303b1b6766a3dc5961713f98f62cd7d7ac7972a) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e77d62a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e77d62a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e77d62a7 Branch: refs/heads/branch-2.2 Commit: e77d62a722941ce1cf235861d21b1f73089be134 Parents: 041aec4 Author: seancxmao Authored: Tue Apr 24 16:16:07 2018 +0800 Committer: hyukjinkwon Committed: Tue Apr 24 16:17:02 2018 +0800 -- .../main/scala/org/apache/spark/sql/execution/SQLExecution.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e77d62a7/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index be35916..bde7d61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -94,7 +94,7 @@ object SQLExecution { /** * Wrap an action with a known executionId. When running a different action in a different * thread from the original one, this method can be used to connect the Spark jobs in this action - * with the known executionId, e.g., `BroadcastHashJoin.broadcastFuture`. + * with the known executionId, e.g., `BroadcastExchangeExec.relationFuture`. */ def withExecutionId[T](sc: SparkContext, executionId: String)(body: => T): T = { val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId
Repository: spark Updated Branches: refs/heads/branch-2.3 1c3e8205d -> 096defdd7 [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId ## What changes were proposed in this pull request? Fix comment. Change `BroadcastHashJoin.broadcastFuture` to `BroadcastExchangeExec.relationFuture`: https://github.com/apache/spark/blob/d28d5732ae205771f1f443b15b10e64dcffb5ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L66 ## How was this patch tested? N/A Author: seancxmaoCloses #21113 from seancxmao/SPARK-13136. (cherry picked from commit c303b1b6766a3dc5961713f98f62cd7d7ac7972a) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/096defdd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/096defdd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/096defdd Branch: refs/heads/branch-2.3 Commit: 096defdd7bb1d687edff06fffdc6cda2ccd022b3 Parents: 1c3e820 Author: seancxmao Authored: Tue Apr 24 16:16:07 2018 +0800 Committer: hyukjinkwon Committed: Tue Apr 24 16:16:41 2018 +0800 -- .../main/scala/org/apache/spark/sql/execution/SQLExecution.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/096defdd/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index e991da7..2c5102b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -88,7 +88,7 @@ object SQLExecution { /** * Wrap an action with a known executionId. When running a different action in a different * thread from the original one, this method can be used to connect the Spark jobs in this action - * with the known executionId, e.g., `BroadcastHashJoin.broadcastFuture`. + * with the known executionId, e.g., `BroadcastExchangeExec.relationFuture`. */ def withExecutionId[T](sc: SparkContext, executionId: String)(body: => T): T = { val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId
Repository: spark Updated Branches: refs/heads/master 281c1ca0d -> c303b1b67 [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId ## What changes were proposed in this pull request? Fix comment. Change `BroadcastHashJoin.broadcastFuture` to `BroadcastExchangeExec.relationFuture`: https://github.com/apache/spark/blob/d28d5732ae205771f1f443b15b10e64dcffb5ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L66 ## How was this patch tested? N/A Author: seancxmaoCloses #21113 from seancxmao/SPARK-13136. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c303b1b6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c303b1b6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c303b1b6 Branch: refs/heads/master Commit: c303b1b6766a3dc5961713f98f62cd7d7ac7972a Parents: 281c1ca Author: seancxmao Authored: Tue Apr 24 16:16:07 2018 +0800 Committer: hyukjinkwon Committed: Tue Apr 24 16:16:07 2018 +0800 -- .../main/scala/org/apache/spark/sql/execution/SQLExecution.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c303b1b6/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index e991da7..2c5102b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -88,7 +88,7 @@ object SQLExecution { /** * Wrap an action with a known executionId. When running a different action in a different * thread from the original one, this method can be used to connect the Spark jobs in this action - * with the known executionId, e.g., `BroadcastHashJoin.broadcastFuture`. + * with the known executionId, e.g., `BroadcastExchangeExec.relationFuture`. */ def withExecutionId[T](sc: SparkContext, executionId: String)(body: => T): T = { val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org