svn commit: r26496 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_24_20_01-5fea17b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-24 Thread pwendell
Author: pwendell
Date: Wed Apr 25 03:15:17 2018
New Revision: 26496

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_24_20_01-5fea17b docs


[This commit notification would consist of 1460 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23821][SQL] Collection function: flatten

2018-04-24 Thread ueshin
Repository: spark
Updated Branches:
  refs/heads/master d6c26d1c9 -> 5fea17b3b


[SPARK-23821][SQL] Collection function: flatten

## What changes were proposed in this pull request?

This PR adds a new collection function that transforms an array of arrays into 
a single array. The PR comprises:
- An expression for flattening array structure
- Flatten function
- A wrapper for PySpark

## How was this patch tested?

New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite

## Codegen examples
### Primitive type
```
val df = Seq(
  Seq(Seq(1, 2), Seq(4, 5)),
  Seq(null, Seq(1))
).toDF("i")
df.filter($"i".isNotNull || $"i".isNull).select(flatten($"i")).debugCodegen
```
Result:
```
/* 033 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */ ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */ null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */ boolean filter_value = true;
/* 038 */
/* 039 */ if (!(!inputadapter_isNull)) {
/* 040 */   filter_value = inputadapter_isNull;
/* 041 */ }
/* 042 */ if (!filter_value) continue;
/* 043 */
/* 044 */ ((org.apache.spark.sql.execution.metric.SQLMetric) 
references[0] /* numOutputRows */).add(1);
/* 045 */
/* 046 */ boolean project_isNull = inputadapter_isNull;
/* 047 */ ArrayData project_value = null;
/* 048 */
/* 049 */ if (!inputadapter_isNull) {
/* 050 */   for (int z = 0; !project_isNull && z < 
inputadapter_value.numElements(); z++) {
/* 051 */ project_isNull |= inputadapter_value.isNullAt(z);
/* 052 */   }
/* 053 */   if (!project_isNull) {
/* 054 */ long project_numElements = 0;
/* 055 */ for (int z = 0; z < inputadapter_value.numElements(); 
z++) {
/* 056 */   project_numElements += 
inputadapter_value.getArray(z).numElements();
/* 057 */ }
/* 058 */ if (project_numElements > 2147483632) {
/* 059 */   throw new RuntimeException("Unsuccessful try to flatten 
an array of arrays with " +
/* 060 */ project_numElements + " elements due to exceeding the 
array size limit 2147483632.");
/* 061 */ }
/* 062 */
/* 063 */ long project_size = 
UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 064 */   project_numElements,
/* 065 */   4);
/* 066 */ if (project_size > 2147483632) {
/* 067 */   throw new RuntimeException("Unsuccessful try to flatten 
an array of arrays with " +
/* 068 */ project_size + " bytes of data due to exceeding the 
limit 2147483632" +
/* 069 */ " bytes for UnsafeArrayData.");
/* 070 */ }
/* 071 */
/* 072 */ byte[] project_array = new byte[(int)project_size];
/* 073 */ UnsafeArrayData project_tempArrayData = new 
UnsafeArrayData();
/* 074 */ Platform.putLong(project_array, 16, project_numElements);
/* 075 */ project_tempArrayData.pointTo(project_array, 16, 
(int)project_size);
/* 076 */ int project_counter = 0;
/* 077 */ for (int k = 0; k < inputadapter_value.numElements(); 
k++) {
/* 078 */   ArrayData arr = inputadapter_value.getArray(k);
/* 079 */   for (int l = 0; l < arr.numElements(); l++) {
/* 080 */ if (arr.isNullAt(l)) {
/* 081 */   project_tempArrayData.setNullAt(project_counter);
/* 082 */ } else {
/* 083 */   project_tempArrayData.setInt(
/* 084 */ project_counter,
/* 085 */ arr.getInt(l)
/* 086 */   );
/* 087 */ }
/* 088 */ project_counter++;
/* 089 */   }
/* 090 */ }
/* 091 */ project_value = project_tempArrayData;
/* 092 */
/* 093 */   }
/* 094 */
/* 095 */ }
```
### Non-primitive type
```
val df = Seq(
  Seq(Seq("a", "b"), Seq(null, "d")),
  Seq(null, Seq("a"))
).toDF("s")
df.filter($"s".isNotNull || $"s".isNull).select(flatten($"s")).debugCodegen
```
Result:
```
/* 033 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */ ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */ null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */ boolean filter_value = true;
/* 038 */
/* 039 */ if (!(!inputadapter_isNull)) {
/* 040 */   filter_value = inputadapter_isNull;
/* 041 */ }
/* 042 */ if (!filter_value) continue;
/* 043 */
/* 044 */ ((org.apache.spark.sql.execution.metric.SQLMetric) 
references[0] /* numOutputRows */).add(1);
/* 045 */
/* 046 */ boolean project_isNull = inputadapter_isNull;
/* 047 */ ArrayData project_value = null;
/* 048 */
/* 049 */ if (!inputadapter_isNull) {
/* 050 */   for (int z = 0; !project_isNull && z < 

spark git commit: [SPARK-24038][SS] Refactor continuous writing to its own class

2018-04-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 7b1e6523a -> d6c26d1c9


[SPARK-24038][SS] Refactor continuous writing to its own class

## What changes were proposed in this pull request?

Refactor continuous writing to its own class.

See WIP https://github.com/jose-torres/spark/pull/13 for the overall direction 
this is going, but I think this PR is very isolated and necessary anyway.

## How was this patch tested?

existing unit tests - refactoring only

Author: Jose Torres 

Closes #21116 from jose-torres/SPARK-24038.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6c26d1c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6c26d1c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6c26d1c

Branch: refs/heads/master
Commit: d6c26d1c9a8f747a3e0d281a27ea9eb4d92102e5
Parents: 7b1e652
Author: Jose Torres 
Authored: Tue Apr 24 17:06:03 2018 -0700
Committer: Tathagata Das 
Committed: Tue Apr 24 17:06:03 2018 -0700

--
 .../datasources/v2/DataSourceV2Strategy.scala   |   4 +
 .../datasources/v2/WriteToDataSourceV2.scala|  74 +--
 .../continuous/ContinuousExecution.scala|   2 +-
 .../WriteToContinuousDataSource.scala   |  31 +
 .../WriteToContinuousDataSourceExec.scala   | 124 +++
 5 files changed, 165 insertions(+), 70 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d6c26d1c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 1ac9572..c2a3144 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
 import org.apache.spark.sql.Strategy
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
+import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
@@ -32,6 +33,9 @@ object DataSourceV2Strategy extends Strategy {
 case WriteToDataSourceV2(writer, query) =>
   WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
 
+case WriteToContinuousDataSource(writer, query) =>
+  WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
+
 case _ => Nil
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d6c26d1c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index e80b44c..ea283ed 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -65,25 +65,10 @@ case class WriteToDataSourceV2Exec(writer: 
DataSourceWriter, query: SparkPlan) e
   s"The input RDD has ${messages.length} partitions.")
 
 try {
-  val runTask = writer match {
-// This case means that we're doing continuous processing. In 
microbatch streaming, the
-// StreamWriter is wrapped in a MicroBatchWriter, which is executed as 
a normal batch.
-case w: StreamWriter =>
-  EpochCoordinatorRef.get(
-
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
-sparkContext.env)
-.askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
-
-  (context: TaskContext, iter: Iterator[InternalRow]) =>
-DataWritingSparkTask.runContinuous(writeTask, context, iter)
-case _ =>
-  (context: TaskContext, iter: Iterator[InternalRow]) =>
-DataWritingSparkTask.run(writeTask, context, iter, 
useCommitCoordinator)
-  }
-
   sparkContext.runJob(
 rdd,
-runTask,
+(context: TaskContext, iter: Iterator[InternalRow]) =>
+  DataWritingSparkTask.run(writeTask, context, iter, 
useCommitCoordinator),
   

svn commit: r26492 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_24_16_01-7b1e652-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-24 Thread pwendell
Author: pwendell
Date: Tue Apr 24 23:15:37 2018
New Revision: 26492

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_24_16_01-7b1e652 docs


[This commit notification would consist of 1460 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24056][SS] Make consumer creation lazy in Kafka source for Structured streaming

2018-04-24 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 379bffa05 -> 7b1e6523a


[SPARK-24056][SS] Make consumer creation lazy in Kafka source for Structured 
streaming

## What changes were proposed in this pull request?

Currently, the driver side of the Kafka source (i.e. KafkaMicroBatchReader) 
eagerly creates a consumer as soon as the Kafk aMicroBatchReader is created. 
However, we create dummy KafkaMicroBatchReader to get the schema and 
immediately stop it. Its better to make the consumer creation lazy, it will be 
created on the first attempt to fetch offsets using the KafkaOffsetReader.

## How was this patch tested?
Existing unit tests

Author: Tathagata Das 

Closes #21134 from tdas/SPARK-24056.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7b1e6523
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7b1e6523
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7b1e6523

Branch: refs/heads/master
Commit: 7b1e6523af3c96043aa8d2763e5f18b6e2781c3d
Parents: 379bffa
Author: Tathagata Das 
Authored: Tue Apr 24 14:33:33 2018 -0700
Committer: Tathagata Das 
Committed: Tue Apr 24 14:33:33 2018 -0700

--
 .../spark/sql/kafka010/KafkaOffsetReader.scala  | 31 +++-
 1 file changed, 17 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7b1e6523/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
--
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 551641c..8206669 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -75,7 +75,17 @@ private[kafka010] class KafkaOffsetReader(
* A KafkaConsumer used in the driver to query the latest Kafka offsets. 
This only queries the
* offsets and never commits them.
*/
-  protected var consumer = createConsumer()
+  @volatile protected var _consumer: Consumer[Array[Byte], Array[Byte]] = null
+
+  protected def consumer: Consumer[Array[Byte], Array[Byte]] = synchronized {
+assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+if (_consumer == null) {
+  val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
+  newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
+  _consumer = consumerStrategy.createConsumer(newKafkaParams)
+}
+_consumer
+  }
 
   private val maxOffsetFetchAttempts =
 readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
@@ -95,9 +105,7 @@ private[kafka010] class KafkaOffsetReader(
* Closes the connection to Kafka, and cleans up state.
*/
   def close(): Unit = {
-runUninterruptibly {
-  consumer.close()
-}
+if (_consumer != null) runUninterruptibly { stopConsumer() }
 kafkaReaderThread.shutdown()
   }
 
@@ -304,19 +312,14 @@ private[kafka010] class KafkaOffsetReader(
 }
   }
 
-  /**
-   * Create a consumer using the new generated group id. We always use a new 
consumer to avoid
-   * just using a broken consumer to retry on Kafka errors, which likely will 
fail again.
-   */
-  private def createConsumer(): Consumer[Array[Byte], Array[Byte]] = 
synchronized {
-val newKafkaParams = new ju.HashMap[String, Object](driverKafkaParams)
-newKafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, nextGroupId())
-consumerStrategy.createConsumer(newKafkaParams)
+  private def stopConsumer(): Unit = synchronized {
+assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+if (_consumer != null) _consumer.close()
   }
 
   private def resetConsumer(): Unit = synchronized {
-consumer.close()
-consumer = createConsumer()
+stopConsumer()
+_consumer = null  // will automatically get reinitialized again
   }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r26486 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_24_12_01-379bffa-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-04-24 Thread pwendell
Author: pwendell
Date: Tue Apr 24 19:15:19 2018
New Revision: 26486

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_04_24_12_01-379bffa docs


[This commit notification would consist of 1460 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23990][ML] Instruments logging improvements - ML regression package

2018-04-24 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 83013752e -> 379bffa05


[SPARK-23990][ML] Instruments logging improvements - ML regression package

## What changes were proposed in this pull request?

Instruments logging improvements - ML regression package

I add an `OptionalInstrument` class which used in `WeightLeastSquares` and 
`IterativelyReweightedLeastSquares`.

## How was this patch tested?

N/A

Author: WeichenXu 

Closes #21078 from WeichenXu123/inst_reg.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/379bffa0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/379bffa0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/379bffa0

Branch: refs/heads/master
Commit: 379bffa0525a4343f8c10e51ed192031922f9874
Parents: 8301375
Author: WeichenXu 
Authored: Tue Apr 24 11:02:22 2018 -0700
Committer: Joseph K. Bradley 
Committed: Tue Apr 24 11:02:22 2018 -0700

--
 .../ml/classification/LogisticRegression.scala  |  4 +-
 .../IterativelyReweightedLeastSquares.scala | 18 --
 .../spark/ml/optim/WeightedLeastSquares.scala   | 32 +
 .../ml/regression/AFTSurvivalRegression.scala   |  2 +-
 .../GeneralizedLinearRegression.scala   | 14 ++--
 .../spark/ml/regression/LinearRegression.scala  | 22 ---
 .../spark/ml/tree/impl/RandomForest.scala   |  2 +
 .../apache/spark/ml/util/Instrumentation.scala  | 68 +++-
 8 files changed, 125 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/379bffa0/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index e426263..06ca37b 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -500,7 +500,7 @@ class LogisticRegression @Since("1.2.0") (
 
 if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
 
-val instr = Instrumentation.create(this, instances)
+val instr = Instrumentation.create(this, dataset)
 instr.logParams(regParam, elasticNetParam, standardization, threshold,
   maxIter, tol, fitIntercept)
 
@@ -816,7 +816,7 @@ class LogisticRegression @Since("1.2.0") (
 
 if (state == null) {
   val msg = s"${optimizer.getClass.getName} failed."
-  logError(msg)
+  instr.logError(msg)
   throw new SparkException(msg)
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/379bffa0/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala
index 6961b45..572b8cf 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/optim/IterativelyReweightedLeastSquares.scala
@@ -17,9 +17,9 @@
 
 package org.apache.spark.ml.optim
 
-import org.apache.spark.internal.Logging
 import org.apache.spark.ml.feature.{Instance, OffsetInstance}
 import org.apache.spark.ml.linalg._
+import org.apache.spark.ml.util.OptionalInstrumentation
 import org.apache.spark.rdd.RDD
 
 /**
@@ -61,9 +61,12 @@ private[ml] class IterativelyReweightedLeastSquares(
 val fitIntercept: Boolean,
 val regParam: Double,
 val maxIter: Int,
-val tol: Double) extends Logging with Serializable {
+val tol: Double) extends Serializable {
 
-  def fit(instances: RDD[OffsetInstance]): 
IterativelyReweightedLeastSquaresModel = {
+  def fit(
+  instances: RDD[OffsetInstance],
+  instr: OptionalInstrumentation = OptionalInstrumentation.create(
+classOf[IterativelyReweightedLeastSquares])): 
IterativelyReweightedLeastSquaresModel = {
 
 var converged = false
 var iter = 0
@@ -83,7 +86,8 @@ private[ml] class IterativelyReweightedLeastSquares(
 
   // Estimate new model
   model = new WeightedLeastSquares(fitIntercept, regParam, elasticNetParam 
= 0.0,
-standardizeFeatures = false, standardizeLabel = 
false).fit(newInstances)
+standardizeFeatures = false, standardizeLabel = false)
+.fit(newInstances, instr = instr)
 
   // Check convergence
   val oldCoefficients = oldModel.coefficients
@@ -96,14 +100,14 @@ private[ml] 

spark git commit: [SPARK-23455][ML] Default Params in ML should be saved separately in metadata

2018-04-24 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master ce7ba2e98 -> 83013752e


[SPARK-23455][ML] Default Params in ML should be saved separately in metadata

## What changes were proposed in this pull request?

We save ML's user-supplied params and default params as one entity in metadata. 
During loading the saved models, we set all the loaded params into created ML 
model instances as user-supplied params.

It causes some problems, e.g., if we strictly disallow some params to be set at 
the same time, a default param can fail the param check because it is treated 
as user-supplied param after loading.

The loaded default params should not be set as user-supplied params. We should 
save ML default params separately in metadata.

For backward compatibility, when loading metadata, if it is a metadata file 
from previous Spark, we shouldn't raise error if we can't find the default 
param field.

## How was this patch tested?

Pass existing tests and added tests.

Author: Liang-Chi Hsieh 

Closes #20633 from viirya/save-ml-default-params.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83013752
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83013752
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83013752

Branch: refs/heads/master
Commit: 83013752e3cfcbc3edeef249439ac20b143eeabc
Parents: ce7ba2e
Author: Liang-Chi Hsieh 
Authored: Tue Apr 24 10:40:25 2018 -0700
Committer: Joseph K. Bradley 
Committed: Tue Apr 24 10:40:25 2018 -0700

--
 .../classification/DecisionTreeClassifier.scala |   2 +-
 .../spark/ml/classification/GBTClassifier.scala |   4 +-
 .../spark/ml/classification/LinearSVC.scala |   2 +-
 .../ml/classification/LogisticRegression.scala  |   2 +-
 .../MultilayerPerceptronClassifier.scala|   2 +-
 .../spark/ml/classification/NaiveBayes.scala|   2 +-
 .../spark/ml/classification/OneVsRest.scala |   4 +-
 .../classification/RandomForestClassifier.scala |   4 +-
 .../spark/ml/clustering/BisectingKMeans.scala   |   2 +-
 .../spark/ml/clustering/GaussianMixture.scala   |   2 +-
 .../org/apache/spark/ml/clustering/KMeans.scala |   2 +-
 .../org/apache/spark/ml/clustering/LDA.scala|   4 +-
 .../feature/BucketedRandomProjectionLSH.scala   |   2 +-
 .../apache/spark/ml/feature/Bucketizer.scala|  24 
 .../apache/spark/ml/feature/ChiSqSelector.scala |   2 +-
 .../spark/ml/feature/CountVectorizer.scala  |   2 +-
 .../scala/org/apache/spark/ml/feature/IDF.scala |   2 +-
 .../org/apache/spark/ml/feature/Imputer.scala   |   2 +-
 .../apache/spark/ml/feature/MaxAbsScaler.scala  |   2 +-
 .../apache/spark/ml/feature/MinHashLSH.scala|   2 +-
 .../apache/spark/ml/feature/MinMaxScaler.scala  |   2 +-
 .../ml/feature/OneHotEncoderEstimator.scala |   2 +-
 .../scala/org/apache/spark/ml/feature/PCA.scala |   2 +-
 .../spark/ml/feature/QuantileDiscretizer.scala  |  24 
 .../org/apache/spark/ml/feature/RFormula.scala  |   6 +-
 .../spark/ml/feature/StandardScaler.scala   |   2 +-
 .../apache/spark/ml/feature/StringIndexer.scala |   2 +-
 .../apache/spark/ml/feature/VectorIndexer.scala |   2 +-
 .../org/apache/spark/ml/feature/Word2Vec.scala  |   2 +-
 .../org/apache/spark/ml/fpm/FPGrowth.scala  |   2 +-
 .../org/apache/spark/ml/param/params.scala  |  13 +-
 .../apache/spark/ml/recommendation/ALS.scala|   2 +-
 .../ml/regression/AFTSurvivalRegression.scala   |   2 +-
 .../ml/regression/DecisionTreeRegressor.scala   |   2 +-
 .../spark/ml/regression/GBTRegressor.scala  |   4 +-
 .../GeneralizedLinearRegression.scala   |   2 +-
 .../ml/regression/IsotonicRegression.scala  |   2 +-
 .../spark/ml/regression/LinearRegression.scala  |   2 +-
 .../ml/regression/RandomForestRegressor.scala   |   4 +-
 .../apache/spark/ml/tuning/CrossValidator.scala |   6 +-
 .../spark/ml/tuning/TrainValidationSplit.scala  |   6 +-
 .../org/apache/spark/ml/util/ReadWrite.scala| 130 ---
 .../spark/ml/util/DefaultReadWriteTest.scala|  73 ++-
 project/MimaExcludes.scala  |   6 +
 44 files changed, 223 insertions(+), 147 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/83013752/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
index 771cd4f..57797d1 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
+++ 

spark git commit: [SPARK-23807][BUILD] Add Hadoop 3.1 profile with relevant POM fix ups

2018-04-24 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 2a24c481d -> ce7ba2e98


[SPARK-23807][BUILD] Add Hadoop 3.1 profile with relevant POM fix ups

## What changes were proposed in this pull request?

1. Adds a `hadoop-3.1` profile build depending on the hadoop-3.1 artifacts.
1. In the hadoop-cloud module, adds an explicit hadoop-3.1 profile which 
switches from explicitly pulling in cloud connectors (hadoop-openstack, 
hadoop-aws, hadoop-azure) to depending on the hadoop-cloudstorage POM artifact, 
which pulls these in, has pre-excluded things like hadoop-common, and stays up 
to date with new connectors (hadoop-azuredatalake, hadoop-allyun). Goal: it 
becomes the Hadoop projects homework of keeping this clean, and the spark 
project doesn't need to handle new hadoop releases adding more dependencies.
1. the hadoop-cloud/hadoop-3.1 profile also declares support for jetty-ajax and 
jetty-util to ensure that these jars get into the distribution jar directory 
when needed by unshaded libraries.
1. Increases the curator and zookeeper versions to match those in hadoop-3, 
fixing spark core to build in sbt with the hadoop-3 dependencies.

## How was this patch tested?

* Everything this has been built and tested against both ASF Hadoop branch-3.1 
and hadoop trunk.
* spark-shell was used to create connectors to all the stores and verify that 
file IO could take place.

The spark hive-1.2.1 JAR has problems here, as it's version check logic fails 
for Hadoop versions > 2.

This can be avoided with either of

* The hadoop JARs built to declare their version as Hadoop 2.11  `mvn install 
-DskipTests -DskipShade -Ddeclared.hadoop.version=2.11` . This is safe for 
local test runs, not for deployment (HDFS is very strict about cross-version 
deployment).
* A modified version of spark hive whose version check switch statement is 
happy with hadoop 3.

I've done both, with maven and SBT.

Three issues surfaced

1. A spark-core test failure —fixed in SPARK-23787.
1. SBT only: Zookeeper not being found in spark-core. Somehow curator 2.12.0 
triggers some slightly different dependency resolution logic from previous 
versions, and Ivy was missing zookeeper.jar entirely. This patch adds the 
explicit declaration for all spark profiles, setting the ZK version = 3.4.9 for 
hadoop-3.1
1. Marking jetty-utils as provided in spark was stopping hadoop-azure from 
being able to instantiate the azure wasb:// client; it was using 
jetty-util-ajax, which could then not find a class in jetty-util.

Author: Steve Loughran 

Closes #20923 from steveloughran/cloud/SPARK-23807-hadoop-31.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce7ba2e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce7ba2e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce7ba2e9

Branch: refs/heads/master
Commit: ce7ba2e98e0a3b038e881c271b5905058c43155b
Parents: 2a24c48
Author: Steve Loughran 
Authored: Tue Apr 24 09:57:09 2018 -0700
Committer: Marcelo Vanzin 
Committed: Tue Apr 24 09:57:09 2018 -0700

--
 assembly/pom.xml   |   8 ++
 core/pom.xml   |   6 +
 dev/deps/spark-deps-hadoop-3.1 | 221 
 dev/test-dependencies.sh   |   1 +
 hadoop-cloud/pom.xml   |  83 +-
 pom.xml|   9 ++
 6 files changed, 327 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ce7ba2e9/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index a207dae..9608c96 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -254,6 +254,14 @@
   spark-hadoop-cloud_${scala.binary.version}
   ${project.version}
 
+
+
+  org.eclipse.jetty
+  jetty-util
+  ${hadoop.deps.scope}
+
   
 
   

http://git-wip-us.apache.org/repos/asf/spark/blob/ce7ba2e9/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 9258a85..093a986 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -95,6 +95,12 @@
   org.apache.curator
   curator-recipes
 
+
+
+  org.apache.zookeeper
+  zookeeper
+
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ce7ba2e9/dev/deps/spark-deps-hadoop-3.1
--
diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1
new file mode 100644
index 000..97ad65a
--- /dev/null
+++ b/dev/deps/spark-deps-hadoop-3.1
@@ -0,0 +1,221 @@
+HikariCP-java7-2.4.12.jar
+JavaEWAH-0.3.2.jar
+RoaringBitmap-0.5.11.jar

spark git commit: [SPARK-23975][ML] Allow Clustering to take Arrays of Double as input features

2018-04-24 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 55c4ca88a -> 2a24c481d


[SPARK-23975][ML] Allow Clustering to take Arrays of Double as input features

## What changes were proposed in this pull request?

- Multiple possible input types is added in validateAndTransformSchema() and 
computeCost() while checking column type

- Add if statement in transform() to support array type as featuresCol

- Add the case statement in fit() while selecting columns from dataset

These changes will be applied to KMeans first, then to other clustering method

## How was this patch tested?

unit test is added

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Lu WANG 

Closes #21081 from ludatabricks/SPARK-23975.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a24c481
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a24c481
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a24c481

Branch: refs/heads/master
Commit: 2a24c481da3f30b510deb62e5cf21c9463cf250c
Parents: 55c4ca8
Author: Lu WANG 
Authored: Tue Apr 24 09:25:41 2018 -0700
Committer: Joseph K. Bradley 
Committed: Tue Apr 24 09:25:41 2018 -0700

--
 .../org/apache/spark/ml/clustering/KMeans.scala | 32 +++---
 .../org/apache/spark/ml/util/DatasetUtils.scala | 63 
 .../spark/ml/clustering/KMeansSuite.scala   | 38 
 3 files changed, 126 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2a24c481/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
index 1ad157a..d475c72 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
@@ -33,8 +33,8 @@ import org.apache.spark.mllib.linalg.{Vector => OldVector, 
Vectors => OldVectors
 import org.apache.spark.mllib.linalg.VectorImplicits._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.functions.{col, udf}
-import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.functions.udf
+import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, 
IntegerType, StructType}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.VersionUtils.majorVersion
 
@@ -87,12 +87,23 @@ private[clustering] trait KMeansParams extends Params with 
HasMaxIter with HasFe
   def getInitSteps: Int = $(initSteps)
 
   /**
+   * Validates the input schema.
+   * @param schema input schema
+   */
+  private[clustering] def validateSchema(schema: StructType): Unit = {
+val typeCandidates = List( new VectorUDT,
+  new ArrayType(DoubleType, false),
+  new ArrayType(FloatType, false))
+
+SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates)
+  }
+  /**
* Validates and transforms the input schema.
* @param schema input schema
* @return output schema
*/
   protected def validateAndTransformSchema(schema: StructType): StructType = {
-SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT)
+validateSchema(schema)
 SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType)
   }
 }
@@ -125,8 +136,11 @@ class KMeansModel private[ml] (
   @Since("2.0.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
+
 val predictUDF = udf((vector: Vector) => predict(vector))
-dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol
+
+dataset.withColumn($(predictionCol),
+  predictUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol)))
   }
 
   @Since("1.5.0")
@@ -146,8 +160,10 @@ class KMeansModel private[ml] (
   // TODO: Replace the temp fix when we have proper evaluators defined for 
clustering.
   @Since("2.0.0")
   def computeCost(dataset: Dataset[_]): Double = {
-SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT)
-val data: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map {
+validateSchema(dataset.schema)
+
+val data: RDD[OldVector] = 
dataset.select(DatasetUtils.columnToVector(dataset, getFeaturesCol))
+  .rdd.map {
   case Row(point: Vector) => OldVectors.fromML(point)
 }
 parentModel.computeCost(data)
@@ -335,7 +351,9 @@ class KMeans @Since("1.5.0") (
 transformSchema(dataset.schema, logging = true)
 
 val handlePersistence = dataset.storageLevel == 

spark git commit: [SPARK-22683][CORE] Add a executorAllocationRatio parameter to throttle the parallelism of the dynamic allocation

2018-04-24 Thread tgraves
Repository: spark
Updated Branches:
  refs/heads/master 4926a7c2f -> 55c4ca88a


[SPARK-22683][CORE] Add a executorAllocationRatio parameter to throttle the 
parallelism of the dynamic allocation

## What changes were proposed in this pull request?

By default, the dynamic allocation will request enough executors to maximize the
parallelism according to the number of tasks to process. While this minimizes 
the
latency of the job, with small tasks this setting can waste a lot of resources 
due to
executor allocation overhead, as some executor might not even do any work.
This setting allows to set a ratio that will be used to reduce the number of
target executors w.r.t. full parallelism.

The number of executors computed with this setting is still fenced by
`spark.dynamicAllocation.maxExecutors` and 
`spark.dynamicAllocation.minExecutors`

## How was this patch tested?
Units tests and runs on various actual workloads on a Yarn Cluster

Author: Julien Cuquemelle 

Closes #19881 from jcuquemelle/AddTaskPerExecutorSlot.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55c4ca88
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55c4ca88
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55c4ca88

Branch: refs/heads/master
Commit: 55c4ca88a3b093ee197a8689631be8d1fac1f10f
Parents: 4926a7c
Author: Julien Cuquemelle 
Authored: Tue Apr 24 10:56:55 2018 -0500
Committer: Thomas Graves 
Committed: Tue Apr 24 10:56:55 2018 -0500

--
 .../spark/ExecutorAllocationManager.scala   | 24 +++---
 .../apache/spark/internal/config/package.scala  |  4 +++
 .../spark/ExecutorAllocationManagerSuite.scala  | 33 
 docs/configuration.md   | 18 +++
 4 files changed, 74 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/55c4ca88/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 189d913..aa363ee 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -26,7 +26,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
 import com.codahale.metrics.{Gauge, MetricRegistry}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, 
DYN_ALLOCATION_MIN_EXECUTORS}
+import org.apache.spark.internal.config._
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.BlockManagerMaster
@@ -69,6 +69,10 @@ import org.apache.spark.util.{Clock, SystemClock, 
ThreadUtils, Utils}
  *   spark.dynamicAllocation.maxExecutors - Upper bound on the number of 
executors
  *   spark.dynamicAllocation.initialExecutors - Number of executors to start 
with
  *
+ *   spark.dynamicAllocation.executorAllocationRatio -
+ * This is used to reduce the parallelism of the dynamic allocation that 
can waste
+ * resources when tasks are small
+ *
  *   spark.dynamicAllocation.schedulerBacklogTimeout (M) -
  * If there are backlogged tasks for this duration, add new executors
  *
@@ -116,9 +120,12 @@ private[spark] class ExecutorAllocationManager(
   // TODO: The default value of 1 for spark.executor.cores works right now 
because dynamic
   // allocation is only supported for YARN and the default number of cores per 
executor in YARN is
   // 1, but it might need to be attained differently for different cluster 
managers
-  private val tasksPerExecutor =
+  private val tasksPerExecutorForFullParallelism =
 conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
 
+  private val executorAllocationRatio =
+conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
+
   validateSettings()
 
   // Number of executors to add in the next round
@@ -209,8 +216,13 @@ private[spark] class ExecutorAllocationManager(
   throw new SparkException("Dynamic allocation of executors requires the 
external " +
 "shuffle service. You may enable this through 
spark.shuffle.service.enabled.")
 }
-if (tasksPerExecutor == 0) {
-  throw new SparkException("spark.executor.cores must not be less than 
spark.task.cpus.")
+if (tasksPerExecutorForFullParallelism == 0) {
+  throw new SparkException("spark.executor.cores must not be < 
spark.task.cpus.")
+}
+
+if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
+  throw new SparkException(
+

spark git commit: [SPARK-23589][SQL][FOLLOW-UP] Reuse InternalRow in ExternalMapToCatalyst eval

2018-04-24 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 87e8a572b -> 4926a7c2f


[SPARK-23589][SQL][FOLLOW-UP] Reuse InternalRow in ExternalMapToCatalyst eval

## What changes were proposed in this pull request?
This pr is a follow-up of #20980 and fixes code to reuse `InternalRow` for 
converting input keys/values in `ExternalMapToCatalyst` eval.

## How was this patch tested?
Existing tests.

Author: Takeshi Yamamuro 

Closes #21137 from maropu/SPARK-23589-FOLLOWUP.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4926a7c2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4926a7c2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4926a7c2

Branch: refs/heads/master
Commit: 4926a7c2f0a47b562f99dbb4f1ca17adb3192061
Parents: 87e8a57
Author: Takeshi Yamamuro 
Authored: Tue Apr 24 17:52:05 2018 +0200
Committer: Herman van Hovell 
Committed: Tue Apr 24 17:52:05 2018 +0200

--
 .../catalyst/expressions/objects/objects.scala  | 92 +++-
 1 file changed, 50 insertions(+), 42 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4926a7c2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 9c7e764..f974fd8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -1255,53 +1255,61 @@ case class ExternalMapToCatalyst private(
   override def dataType: MapType = MapType(
 keyConverter.dataType, valueConverter.dataType, valueContainsNull = 
valueConverter.nullable)
 
-  private lazy val mapCatalystConverter: Any => (Array[Any], Array[Any]) = 
child.dataType match {
-case ObjectType(cls) if classOf[java.util.Map[_, _]].isAssignableFrom(cls) 
=>
-  (input: Any) => {
-val data = input.asInstanceOf[java.util.Map[Any, Any]]
-val keys = new Array[Any](data.size)
-val values = new Array[Any](data.size)
-val iter = data.entrySet().iterator()
-var i = 0
-while (iter.hasNext) {
-  val entry = iter.next()
-  val (key, value) = (entry.getKey, entry.getValue)
-  keys(i) = if (key != null) {
-keyConverter.eval(InternalRow.fromSeq(key :: Nil))
-  } else {
-throw new RuntimeException("Cannot use null as map key!")
-  }
-  values(i) = if (value != null) {
-valueConverter.eval(InternalRow.fromSeq(value :: Nil))
-  } else {
-null
+  private lazy val mapCatalystConverter: Any => (Array[Any], Array[Any]) = {
+val rowBuffer = InternalRow.fromSeq(Array[Any](1))
+def rowWrapper(data: Any): InternalRow = {
+  rowBuffer.update(0, data)
+  rowBuffer
+}
+
+child.dataType match {
+  case ObjectType(cls) if classOf[java.util.Map[_, 
_]].isAssignableFrom(cls) =>
+(input: Any) => {
+  val data = input.asInstanceOf[java.util.Map[Any, Any]]
+  val keys = new Array[Any](data.size)
+  val values = new Array[Any](data.size)
+  val iter = data.entrySet().iterator()
+  var i = 0
+  while (iter.hasNext) {
+val entry = iter.next()
+val (key, value) = (entry.getKey, entry.getValue)
+keys(i) = if (key != null) {
+  keyConverter.eval(rowWrapper(key))
+} else {
+  throw new RuntimeException("Cannot use null as map key!")
+}
+values(i) = if (value != null) {
+  valueConverter.eval(rowWrapper(value))
+} else {
+  null
+}
+i += 1
   }
-  i += 1
+  (keys, values)
 }
-(keys, values)
-  }
 
-case ObjectType(cls) if classOf[scala.collection.Map[_, 
_]].isAssignableFrom(cls) =>
-  (input: Any) => {
-val data = input.asInstanceOf[scala.collection.Map[Any, Any]]
-val keys = new Array[Any](data.size)
-val values = new Array[Any](data.size)
-var i = 0
-for ((key, value) <- data) {
-  keys(i) = if (key != null) {
-keyConverter.eval(InternalRow.fromSeq(key :: Nil))
-  } else {
-throw new RuntimeException("Cannot use null as map key!")
-  }
-  values(i) = if (value != null) {
-valueConverter.eval(InternalRow.fromSeq(value :: Nil))
-  } else {

spark git commit: [SPARK-24054][R] Add array_position function / element_at functions

2018-04-24 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master c303b1b67 -> 87e8a572b


[SPARK-24054][R] Add array_position function / element_at functions

## What changes were proposed in this pull request?

This PR proposes to add array_position and element_at in R side too.

array_position:

```r
df <- createDataFrame(cbind(model = rownames(mtcars), mtcars))
mutated <- mutate(df, v1 = create_array(df$gear, df$am, df$carb))
head(select(mutated, array_position(mutated$v1, 1)))
```

```
  array_position(v1, 1.0)
1   2
2   2
3   2
4   3
5   0
6   3
```

element_at:

```r
df <- createDataFrame(cbind(model = rownames(mtcars), mtcars))
mutated <- mutate(df, v1 = create_array(df$mpg, df$cyl, df$hp))
head(select(mutated, element_at(mutated$v1, 1)))
```

```
  element_at(v1, 1.0)
121.0
221.0
322.8
421.4
518.7
618.1
```

```r
df <- createDataFrame(cbind(model = rownames(mtcars), mtcars))
mutated <- mutate(df, v1 = create_map(df$model, df$cyl))
head(select(mutated, element_at(mutated$v1, "Valiant")))
```

```
  element_at(v3, Valiant)
1  NA
2  NA
3  NA
4  NA
5  NA
6   6
```

## How was this patch tested?

Unit tests were added in `R/pkg/tests/fulltests/test_sparkSQL.R` and manually 
tested. Documentation was manually built and verified.

Author: hyukjinkwon 

Closes #21130 from HyukjinKwon/sparkr_array_position_element_at.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/87e8a572
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/87e8a572
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/87e8a572

Branch: refs/heads/master
Commit: 87e8a572be14381da9081365d9aa2cbf3253a32c
Parents: c303b1b
Author: hyukjinkwon 
Authored: Tue Apr 24 16:18:20 2018 +0800
Committer: hyukjinkwon 
Committed: Tue Apr 24 16:18:20 2018 +0800

--
 R/pkg/NAMESPACE   |  2 ++
 R/pkg/R/functions.R   | 42 --
 R/pkg/R/generics.R|  8 ++
 R/pkg/tests/fulltests/test_sparkSQL.R | 13 +++--
 4 files changed, 61 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/87e8a572/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 190c50e..55dec17 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -201,6 +201,7 @@ exportMethods("%<=>%",
   "approxCountDistinct",
   "approxQuantile",
   "array_contains",
+  "array_position",
   "asc",
   "ascii",
   "asin",
@@ -245,6 +246,7 @@ exportMethods("%<=>%",
   "decode",
   "dense_rank",
   "desc",
+  "element_at",
   "encode",
   "endsWith",
   "exp",

http://git-wip-us.apache.org/repos/asf/spark/blob/87e8a572/R/pkg/R/functions.R
--
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index a527426..7b3aa05 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -189,6 +189,11 @@ NULL
 #'  the map or array of maps.
 #'  \item \code{from_json}: it is the column containing the JSON 
string.
 #'  }
+#' @param value A value to compute on.
+#'  \itemize{
+#'  \item \code{array_contains}: a value to be checked if contained in 
the column.
+#'  \item \code{array_position}: a value to locate in the given array.
+#'  }
 #' @param ... additional argument(s). In \code{to_json} and \code{from_json}, 
this contains
 #'additional named properties to control how it is converted, 
accepts the same
 #'options as the JSON data source.
@@ -201,6 +206,7 @@ NULL
 #' df <- createDataFrame(cbind(model = rownames(mtcars), mtcars))
 #' tmp <- mutate(df, v1 = create_array(df$mpg, df$cyl, df$hp))
 #' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1)))
+#' head(select(tmp, array_position(tmp$v1, 21)))
 #' tmp2 <- mutate(tmp, v2 = explode(tmp$v1))
 #' head(tmp2)
 #' head(select(tmp, posexplode(tmp$v1)))
@@ -208,7 +214,8 @@ NULL
 #' head(select(tmp, sort_array(tmp$v1, asc = FALSE)))
 #' tmp3 <- mutate(df, v3 = create_map(df$model, df$cyl))
 #' head(select(tmp3, map_keys(tmp3$v3)))
-#' head(select(tmp3, map_values(tmp3$v3)))}
+#' head(select(tmp3, map_values(tmp3$v3)))
+#' head(select(tmp3, 

spark git commit: [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId

2018-04-24 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 041aec4e1 -> e77d62a72


[MINOR][DOCS] Fix comments of SQLExecution#withExecutionId

## What changes were proposed in this pull request?
Fix comment. Change `BroadcastHashJoin.broadcastFuture` to 
`BroadcastExchangeExec.relationFuture`: 
https://github.com/apache/spark/blob/d28d5732ae205771f1f443b15b10e64dcffb5ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L66

## How was this patch tested?
N/A

Author: seancxmao 

Closes #21113 from seancxmao/SPARK-13136.

(cherry picked from commit c303b1b6766a3dc5961713f98f62cd7d7ac7972a)
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e77d62a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e77d62a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e77d62a7

Branch: refs/heads/branch-2.2
Commit: e77d62a722941ce1cf235861d21b1f73089be134
Parents: 041aec4
Author: seancxmao 
Authored: Tue Apr 24 16:16:07 2018 +0800
Committer: hyukjinkwon 
Committed: Tue Apr 24 16:17:02 2018 +0800

--
 .../main/scala/org/apache/spark/sql/execution/SQLExecution.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e77d62a7/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index be35916..bde7d61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -94,7 +94,7 @@ object SQLExecution {
   /**
* Wrap an action with a known executionId. When running a different action 
in a different
* thread from the original one, this method can be used to connect the 
Spark jobs in this action
-   * with the known executionId, e.g., `BroadcastHashJoin.broadcastFuture`.
+   * with the known executionId, e.g., `BroadcastExchangeExec.relationFuture`.
*/
   def withExecutionId[T](sc: SparkContext, executionId: String)(body: => T): T 
= {
 val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId

2018-04-24 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 1c3e8205d -> 096defdd7


[MINOR][DOCS] Fix comments of SQLExecution#withExecutionId

## What changes were proposed in this pull request?
Fix comment. Change `BroadcastHashJoin.broadcastFuture` to 
`BroadcastExchangeExec.relationFuture`: 
https://github.com/apache/spark/blob/d28d5732ae205771f1f443b15b10e64dcffb5ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L66

## How was this patch tested?
N/A

Author: seancxmao 

Closes #21113 from seancxmao/SPARK-13136.

(cherry picked from commit c303b1b6766a3dc5961713f98f62cd7d7ac7972a)
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/096defdd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/096defdd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/096defdd

Branch: refs/heads/branch-2.3
Commit: 096defdd7bb1d687edff06fffdc6cda2ccd022b3
Parents: 1c3e820
Author: seancxmao 
Authored: Tue Apr 24 16:16:07 2018 +0800
Committer: hyukjinkwon 
Committed: Tue Apr 24 16:16:41 2018 +0800

--
 .../main/scala/org/apache/spark/sql/execution/SQLExecution.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/096defdd/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index e991da7..2c5102b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -88,7 +88,7 @@ object SQLExecution {
   /**
* Wrap an action with a known executionId. When running a different action 
in a different
* thread from the original one, this method can be used to connect the 
Spark jobs in this action
-   * with the known executionId, e.g., `BroadcastHashJoin.broadcastFuture`.
+   * with the known executionId, e.g., `BroadcastExchangeExec.relationFuture`.
*/
   def withExecutionId[T](sc: SparkContext, executionId: String)(body: => T): T 
= {
 val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId

2018-04-24 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 281c1ca0d -> c303b1b67


[MINOR][DOCS] Fix comments of SQLExecution#withExecutionId

## What changes were proposed in this pull request?
Fix comment. Change `BroadcastHashJoin.broadcastFuture` to 
`BroadcastExchangeExec.relationFuture`: 
https://github.com/apache/spark/blob/d28d5732ae205771f1f443b15b10e64dcffb5ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L66

## How was this patch tested?
N/A

Author: seancxmao 

Closes #21113 from seancxmao/SPARK-13136.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c303b1b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c303b1b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c303b1b6

Branch: refs/heads/master
Commit: c303b1b6766a3dc5961713f98f62cd7d7ac7972a
Parents: 281c1ca
Author: seancxmao 
Authored: Tue Apr 24 16:16:07 2018 +0800
Committer: hyukjinkwon 
Committed: Tue Apr 24 16:16:07 2018 +0800

--
 .../main/scala/org/apache/spark/sql/execution/SQLExecution.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c303b1b6/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index e991da7..2c5102b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -88,7 +88,7 @@ object SQLExecution {
   /**
* Wrap an action with a known executionId. When running a different action 
in a different
* thread from the original one, this method can be used to connect the 
Spark jobs in this action
-   * with the known executionId, e.g., `BroadcastHashJoin.broadcastFuture`.
+   * with the known executionId, e.g., `BroadcastExchangeExec.relationFuture`.
*/
   def withExecutionId[T](sc: SparkContext, executionId: String)(body: => T): T 
= {
 val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org