spark git commit: [SPARK-16968][SQL][BACKPORT-2.0] Add additional options in jdbc when creating a new table

2017-01-18 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 ee4e8faff -> 9fc053c30


[SPARK-16968][SQL][BACKPORT-2.0] Add additional options in jdbc when creating a 
new table

### What changes were proposed in this pull request?
This PR is to backport the PRs https://github.com/apache/spark/pull/14559 and 
https://github.com/apache/spark/pull/14683

---
In the PR, we just allow the user to add additional options when create a new 
table in JDBC writer.
The options can be table_options or partition_options.
E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8"

Here is the usage example:
```
df.write.option("createTableOptions", "ENGINE=InnoDB DEFAULT 
CHARSET=utf8").jdbc(...)
```
### How was this patch tested?
Added a test case.

Author: gatorsmile 

Closes #16634 from gatorsmile/backportSPARK-16968.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fc053c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fc053c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fc053c3

Branch: refs/heads/branch-2.0
Commit: 9fc053c30ae3670a1bcacacf33838750ddaca676
Parents: ee4e8fa
Author: GraceH 
Authored: Wed Jan 18 20:58:31 2017 -0800
Committer: gatorsmile 
Committed: Wed Jan 18 20:58:31 2017 -0800

--
 docs/sql-programming-guide.md   |  7 ++
 .../org/apache/spark/sql/DataFrameWriter.scala  | 24 ++--
 .../datasources/jdbc/JDBCOptions.scala  | 17 +-
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  | 10 
 4 files changed, 50 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9fc053c3/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index d38aed1..274310f 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1023,6 +1023,13 @@ the Data Sources API. The following options are 
supported:
   The JDBC fetch size, which determines how many rows to fetch per round 
trip. This can help performance on JDBC drivers which default to low fetch size 
(eg. Oracle with 10 rows).
 
   
+
+  
+createTableOptions
+
+  This is a JDBC writer related option. If specified, this option allows 
setting of database-specific table and partition options when creating a table. 
For example: CREATE TABLE t (name string) ENGINE=InnoDB.
+   
+  
 
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9fc053c3/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index a4c4a5d..3cad7df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
 import org.apache.spark.sql.execution.datasources.{BucketSpec, 
CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
 
 /**
  * Interface used to write a [[Dataset]] to external storage systems (e.g. 
file systems,
@@ -399,6 +399,12 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 assertNotPartitioned("jdbc")
 assertNotBucketed("jdbc")
 
+// to add required options like URL and dbtable
+val params = extraOptions.toMap ++ Map("url" -> url, "dbtable" -> table)
+val jdbcOptions = new JDBCOptions(params)
+val jdbcUrl = jdbcOptions.url
+val jdbcTable = jdbcOptions.table
+
 val props = new Properties()
 extraOptions.foreach { case (key, value) =>
   props.put(key, value)
@@ -408,25 +414,29 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
 val conn = JdbcUtils.createConnectionFactory(url, props)()
 
 try {
-  var tableExists = JdbcUtils.tableExists(conn, url, table)
+  var tableExists = JdbcUtils.tableExists(conn, jdbcUrl, jdbcTable)
 
   if (mode == SaveMode.Ignore && tableExists) {
 return
   }
 
   if (mode == SaveMode.ErrorIfExists && tableExists) {
-sys.error(s"Table $table already exists.")
+sys.error(s"Table $jdbcTable already exists.")
   }
 
   if (mode == SaveMode.Overwrite && tableExists) {
-JdbcUtils.dropTable(conn, table)
+JdbcUtils.dropTable(conn, jdbcTable)
 tab

spark git commit: Update known_translations for contributor names

2017-01-18 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master fe409f31d -> 0c9231858


Update known_translations for contributor names

## What changes were proposed in this pull request?
Update known_translations per 
https://github.com/apache/spark/pull/16423#issuecomment-269739634

Author: Yin Huai 

Closes #16628 from yhuai/known_translations.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c923185
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c923185
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c923185

Branch: refs/heads/master
Commit: 0c9231858866eff16f97df073d22811176fb6b36
Parents: fe409f3
Author: Yin Huai 
Authored: Wed Jan 18 18:18:51 2017 -0800
Committer: Yin Huai 
Committed: Wed Jan 18 18:18:51 2017 -0800

--
 dev/create-release/known_translations | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0c923185/dev/create-release/known_translations
--
diff --git a/dev/create-release/known_translations 
b/dev/create-release/known_translations
index 0f30990..87bf2f2 100644
--- a/dev/create-release/known_translations
+++ b/dev/create-release/known_translations
@@ -177,7 +177,7 @@ anabranch - Bill Chambers
 ashangit - Nicolas Fraison
 avulanov - Alexander Ulanov
 biglobster - Liang Ke
-cenyuhai - Cen Yu Hai
+cenyuhai - Yuhai Cen
 codlife - Jianfei Wang
 david-weiluo-ren - Weiluo (David) Ren
 dding3 - Ding Ding
@@ -198,7 +198,8 @@ petermaxlee - Peter Lee
 phalodi - Sandeep Purohit
 pkch - pkch
 priyankagargnitk - Priyanka Garg
-sharkdtu - Sharkd Tu
+sharkdtu - Xiaogang Tu
 shenh062326 - Shen Hong
 aokolnychyi - Anton Okolnychyi
 linbojin - Linbo Jin
+lw-lin - Liwei Lin


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-14975][ML] Fixed GBTClassifier to predict probability per training instance and fixed interfaces

2017-01-18 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master a81e336f1 -> fe409f31d


[SPARK-14975][ML] Fixed GBTClassifier to predict probability per training 
instance and fixed interfaces

## What changes were proposed in this pull request?

For all of the classifiers in MLLib we can predict probabilities except for 
GBTClassifier.
Also, all classifiers inherit from ProbabilisticClassifier but GBTClassifier 
strangely inherits from Predictor, which is a bug.
This change corrects the interface and adds the ability for the classifier to 
give a probabilities vector.

## How was this patch tested?

The basic ML tests were run after making the changes.  I've marked this as WIP 
as I need to add more tests.

Author: Ilya Matiach 

Closes #16441 from imatiach-msft/ilmat/fix-GBT.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe409f31
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe409f31
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe409f31

Branch: refs/heads/master
Commit: fe409f31d966d99fcf57137581d1fb682c1c072a
Parents: a81e336
Author: Ilya Matiach 
Authored: Wed Jan 18 15:33:41 2017 -0800
Committer: Joseph K. Bradley 
Committed: Wed Jan 18 15:33:41 2017 -0800

--
 .../spark/ml/classification/GBTClassifier.scala |  94 ---
 .../org/apache/spark/ml/tree/treeParams.scala   |   4 +-
 .../apache/spark/mllib/tree/loss/LogLoss.scala  |  10 +-
 .../org/apache/spark/mllib/tree/loss/Loss.scala |   8 +-
 .../ml/classification/GBTClassifierSuite.scala  | 161 ++-
 5 files changed, 248 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fe409f31/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala 
b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
index c9bbd37..ade0960 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala
@@ -23,9 +23,8 @@ import org.json4s.JsonDSL._
 
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.Logging
-import org.apache.spark.ml.{PredictionModel, Predictor}
 import org.apache.spark.ml.feature.LabeledPoint
-import org.apache.spark.ml.linalg.Vector
+import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
 import org.apache.spark.ml.param.ParamMap
 import org.apache.spark.ml.regression.DecisionTreeRegressionModel
 import org.apache.spark.ml.tree._
@@ -33,6 +32,7 @@ import org.apache.spark.ml.tree.impl.GradientBoostedTrees
 import org.apache.spark.ml.util._
 import org.apache.spark.ml.util.DefaultParamsReader.Metadata
 import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo}
+import org.apache.spark.mllib.tree.loss.LogLoss
 import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => 
OldGBTModel}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
@@ -58,7 +58,7 @@ import org.apache.spark.sql.functions._
 @Since("1.4.0")
 class GBTClassifier @Since("1.4.0") (
 @Since("1.4.0") override val uid: String)
-  extends Predictor[Vector, GBTClassifier, GBTClassificationModel]
+  extends ProbabilisticClassifier[Vector, GBTClassifier, 
GBTClassificationModel]
   with GBTClassifierParams with DefaultParamsWritable with Logging {
 
   @Since("1.4.0")
@@ -158,12 +158,19 @@ class GBTClassifier @Since("1.4.0") (
 val numFeatures = oldDataset.first().features.size
 val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, 
OldAlgo.Classification)
 
+val numClasses = 2
+if (isDefined(thresholds)) {
+  require($(thresholds).length == numClasses, this.getClass.getSimpleName +
+".train() called with non-matching numClasses and thresholds.length." +
+s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
+}
+
 val instr = Instrumentation.create(this, oldDataset)
 instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType,
   maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, 
minInstancesPerNode,
   seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval)
 instr.logNumFeatures(numFeatures)
-instr.logNumClasses(2)
+instr.logNumClasses(numClasses)
 
 val (baseLearners, learnerWeights) = GradientBoostedTrees.run(oldDataset, 
boostingStrategy,
   $(seed))
@@ -202,8 +209,9 @@ class GBTClassificationModel private[ml](
 @Since("1.6.0") override val uid: String,
 private val _trees: Array[DecisionTreeRegressionModel],
 private val _treeWeights: Array[Double],
-

spark git commit: [SPARK-19182][DSTREAM] Optimize the lock in StreamingJobProgressListener to not block UI when generating Streaming jobs

2017-01-18 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 569e50680 -> a81e336f1


[SPARK-19182][DSTREAM] Optimize the lock in StreamingJobProgressListener to not 
block UI when generating Streaming jobs

## What changes were proposed in this pull request?

When DStreamGraph is generating a job, it will hold a lock and block other 
APIs. Because StreamingJobProgressListener (numInactiveReceivers, 
streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to 
access some information, the UI may hang if generating a job is very slow 
(e.g., talking to the slow Kafka cluster to fetch metadata).
It's better to optimize the locks in DStreamGraph and 
StreamingJobProgressListener to make the UI not block by job generation.

## How was this patch tested?
existing ut

cc zsxwing

Author: uncleGen 

Closes #16601 from uncleGen/SPARK-19182.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a81e336f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a81e336f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a81e336f

Branch: refs/heads/master
Commit: a81e336f1eddc2c6245d807aae2c81ddc60eabf9
Parents: 569e506
Author: uncleGen 
Authored: Wed Jan 18 10:55:31 2017 -0800
Committer: Shixiong Zhu 
Committed: Wed Jan 18 10:55:31 2017 -0800

--
 .../org/apache/spark/streaming/DStreamGraph.scala  | 13 +
 .../streaming/ui/StreamingJobProgressListener.scala|  8 
 2 files changed, 13 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a81e336f/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 54d736e..dce2028 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends 
Serializable with Logging {
   private val inputStreams = new ArrayBuffer[InputDStream[_]]()
   private val outputStreams = new ArrayBuffer[DStream[_]]()
 
+  @volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil
+
   var rememberDuration: Duration = null
   var checkpointInProgress = false
 
   var zeroTime: Time = null
   var startTime: Time = null
   var batchDuration: Duration = null
+  @volatile private var numReceivers: Int = 0
 
   def start(time: Time) {
 this.synchronized {
@@ -45,7 +48,9 @@ final private[streaming] class DStreamGraph extends 
Serializable with Logging {
   startTime = time
   outputStreams.foreach(_.initialize(zeroTime))
   outputStreams.foreach(_.remember(rememberDuration))
-  outputStreams.foreach(_.validateAtStart)
+  outputStreams.foreach(_.validateAtStart())
+  numReceivers = 
inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]])
+  inputStreamNameAndID = inputStreams.map(is => (is.name, is.id))
   inputStreams.par.foreach(_.start())
 }
   }
@@ -106,9 +111,9 @@ final private[streaming] class DStreamGraph extends 
Serializable with Logging {
   .toArray
   }
 
-  def getInputStreamName(streamId: Int): Option[String] = synchronized {
-inputStreams.find(_.id == streamId).map(_.name)
-  }
+  def getNumReceivers: Int = numReceivers
+
+  def getInputStreamNameAndID: Seq[(String, Int)] = inputStreamNameAndID
 
   def generateJobs(time: Time): Seq[Job] = {
 logDebug("Generating jobs for time " + time)

http://git-wip-us.apache.org/repos/asf/spark/blob/a81e336f/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 95f5821..ed4c1e4 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: 
StreamingContext)
   }
 
   def numInactiveReceivers: Int = {
-ssc.graph.getReceiverInputStreams().length - numActiveReceivers
+ssc.graph.getNumReceivers - numActiveReceivers
   }
 
   def numTotalCompletedBatches: Long = synchronized {
@@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: 
StreamingContext)
   }
 
   def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
-completedBatchUIData.toSeq
+completedBa

spark git commit: [SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon error

2017-01-18 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master c050c1227 -> 569e50680


[SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon error

## What changes were proposed in this pull request?

We should call `StateStore.abort()` when there should be any error before the 
store is committed.

## How was this patch tested?

Manually.

Author: Liwei Lin 

Closes #16547 from lw-lin/append-filter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/569e5068
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/569e5068
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/569e5068

Branch: refs/heads/master
Commit: 569e50680f97b1ed054337a39fe198769ef52d93
Parents: c050c12
Author: Liwei Lin 
Authored: Wed Jan 18 10:52:47 2017 -0800
Committer: Shixiong Zhu 
Committed: Wed Jan 18 10:52:47 2017 -0800

--
 .../spark/sql/execution/streaming/StatefulAggregate.scala| 8 
 .../streaming/state/HDFSBackedStateStoreProvider.scala   | 2 +-
 .../spark/sql/execution/streaming/state/StateStore.scala | 2 +-
 3 files changed, 10 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index 0551e4b..d4ccced 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.state._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.TaskContext
 
 
 /** Used to identify the state store for a given operator. */
@@ -150,6 +151,13 @@ case class StateStoreSaveExec(
 val numTotalStateRows = longMetric("numTotalStateRows")
 val numUpdatedStateRows = longMetric("numUpdatedStateRows")
 
+// Abort the state store in case of error
+TaskContext.get().addTaskCompletionListener(_ => {
+  if (!store.hasCommitted) {
+store.abort()
+  }
+})
+
 outputMode match {
   // Update and output all rows in the StateStore.
   case Some(Complete) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 4f3f818..1279b71 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -203,7 +203,7 @@ private[state] class HDFSBackedStateStoreProvider(
 /**
  * Whether all updates have been committed
  */
-override private[state] def hasCommitted: Boolean = {
+override private[streaming] def hasCommitted: Boolean = {
   state == COMMITTED
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 9bc6c0e..d59746f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -83,7 +83,7 @@ trait StateStore {
   /**
* Whether all updates have been committed
*/
-  private[state] def hasCommitted: Boolean
+  private[streaming] def hasCommitted: Boolean
 }
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon error

2017-01-18 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 047506bae -> 4cff0b504


[SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon error

## What changes were proposed in this pull request?

We should call `StateStore.abort()` when there should be any error before the 
store is committed.

## How was this patch tested?

Manually.

Author: Liwei Lin 

Closes #16547 from lw-lin/append-filter.

(cherry picked from commit 569e50680f97b1ed054337a39fe198769ef52d93)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4cff0b50
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4cff0b50
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4cff0b50

Branch: refs/heads/branch-2.1
Commit: 4cff0b504c367db314f10e730fe39dc083529f16
Parents: 047506b
Author: Liwei Lin 
Authored: Wed Jan 18 10:52:47 2017 -0800
Committer: Shixiong Zhu 
Committed: Wed Jan 18 10:52:54 2017 -0800

--
 .../spark/sql/execution/streaming/StatefulAggregate.scala| 8 
 .../streaming/state/HDFSBackedStateStoreProvider.scala   | 2 +-
 .../spark/sql/execution/streaming/state/StateStore.scala | 2 +-
 3 files changed, 10 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4cff0b50/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index 0551e4b..d4ccced 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.state._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.TaskContext
 
 
 /** Used to identify the state store for a given operator. */
@@ -150,6 +151,13 @@ case class StateStoreSaveExec(
 val numTotalStateRows = longMetric("numTotalStateRows")
 val numUpdatedStateRows = longMetric("numUpdatedStateRows")
 
+// Abort the state store in case of error
+TaskContext.get().addTaskCompletionListener(_ => {
+  if (!store.hasCommitted) {
+store.abort()
+  }
+})
+
 outputMode match {
   // Update and output all rows in the StateStore.
   case Some(Complete) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/4cff0b50/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 4f3f818..1279b71 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -203,7 +203,7 @@ private[state] class HDFSBackedStateStoreProvider(
 /**
  * Whether all updates have been committed
  */
-override private[state] def hasCommitted: Boolean = {
+override private[streaming] def hasCommitted: Boolean = {
   state == COMMITTED
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4cff0b50/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 9bc6c0e..d59746f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -83,7 +83,7 @@ trait StateStore {
   /**
* Whether all updates have been committed
*/
-  private[state] def hasCommitted: Boolean
+  private[streaming] def hasCommitted: Boolean
 }
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19113][SS][TESTS] Ignore StreamingQueryException thrown from awaitInitialization to avoid breaking tests

2017-01-18 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 77202a6c5 -> 047506bae


[SPARK-19113][SS][TESTS] Ignore StreamingQueryException thrown from 
awaitInitialization to avoid breaking tests

## What changes were proposed in this pull request?

#16492 missed one race condition: `StreamExecution.awaitInitialization` may 
throw fatal errors and fail the test. This PR just ignores 
`StreamingQueryException` thrown from `awaitInitialization` so that we can 
verify the exception in the `ExpectFailure` action later. It's fine since 
`StopStream` or `ExpectFailure` will catch `StreamingQueryException` as well.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16567 from zsxwing/SPARK-19113-2.

(cherry picked from commit c050c12274fba2ac4c4938c4724049a47fa59280)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/047506ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/047506ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/047506ba

Branch: refs/heads/branch-2.1
Commit: 047506bae4f9a3505ac886ba04969d8d11f5
Parents: 77202a6
Author: Shixiong Zhu 
Authored: Wed Jan 18 10:50:51 2017 -0800
Committer: Shixiong Zhu 
Committed: Wed Jan 18 10:51:00 2017 -0800

--
 .../scala/org/apache/spark/sql/streaming/StreamTest.scala | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/047506ba/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 4aa4100..af2f31a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -385,7 +385,12 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
 .streamingQuery
 // Wait until the initialization finishes, because some tests need 
to use `logicalPlan`
 // after starting the query.
-currentStream.awaitInitialization(streamingTimeout.toMillis)
+try {
+  currentStream.awaitInitialization(streamingTimeout.toMillis)
+} catch {
+  case _: StreamingQueryException =>
+// Ignore the exception. `StopStream` or `ExpectFailure` will 
catch it as well.
+}
 
   case AdvanceManualClock(timeToAdd) =>
 verify(currentStream != null,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19113][SS][TESTS] Ignore StreamingQueryException thrown from awaitInitialization to avoid breaking tests

2017-01-18 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 33791a8ce -> c050c1227


[SPARK-19113][SS][TESTS] Ignore StreamingQueryException thrown from 
awaitInitialization to avoid breaking tests

## What changes were proposed in this pull request?

#16492 missed one race condition: `StreamExecution.awaitInitialization` may 
throw fatal errors and fail the test. This PR just ignores 
`StreamingQueryException` thrown from `awaitInitialization` so that we can 
verify the exception in the `ExpectFailure` action later. It's fine since 
`StopStream` or `ExpectFailure` will catch `StreamingQueryException` as well.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16567 from zsxwing/SPARK-19113-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c050c122
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c050c122
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c050c122

Branch: refs/heads/master
Commit: c050c12274fba2ac4c4938c4724049a47fa59280
Parents: 33791a8
Author: Shixiong Zhu 
Authored: Wed Jan 18 10:50:51 2017 -0800
Committer: Shixiong Zhu 
Committed: Wed Jan 18 10:50:51 2017 -0800

--
 .../scala/org/apache/spark/sql/streaming/StreamTest.scala | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c050c122/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 4aa4100..af2f31a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -385,7 +385,12 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
 .streamingQuery
 // Wait until the initialization finishes, because some tests need 
to use `logicalPlan`
 // after starting the query.
-currentStream.awaitInitialization(streamingTimeout.toMillis)
+try {
+  currentStream.awaitInitialization(streamingTimeout.toMillis)
+} catch {
+  case _: StreamingQueryException =>
+// Ignore the exception. `StopStream` or `ExpectFailure` will 
catch it as well.
+}
 
   case AdvanceManualClock(timeToAdd) =>
 verify(currentStream != null,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18113] Use ask to replace askWithRetry in canCommit and make receiver idempotent.

2017-01-18 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 278fa1eb3 -> 33791a8ce


[SPARK-18113] Use ask to replace askWithRetry in canCommit and make receiver 
idempotent.

## What changes were proposed in this pull request?

Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If 
timeout, it will send again. Thus AskPermissionToCommitOutput can be received 
multi times. Method canCommit should return the same value when called by the 
same attempt multi times.

In implementation before this fix, method handleAskPermissionToCommit just 
check if there is committer already registered, which is not enough. When 
worker retries AskPermissionToCommitOutput it will get CommitDeniedException, 
then the task will fail with reason TaskCommitDenied, which is not regarded as 
a task failure(SPARK-11178), so TaskScheduler will schedule this task 
infinitely.

In this fix, use `ask` to replace `askWithRetry` in `canCommit` and make 
receiver idempotent.

## How was this patch tested?

Added a new unit test to OutputCommitCoordinatorSuite.

Author: jinxing 

Closes #16503 from jinxing64/SPARK-18113.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33791a8c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33791a8c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33791a8c

Branch: refs/heads/master
Commit: 33791a8ced61d1ffa09f68033d240f874fdb1593
Parents: 278fa1e
Author: jinxing 
Authored: Wed Jan 18 10:47:22 2017 -0800
Committer: Marcelo Vanzin 
Committed: Wed Jan 18 10:47:22 2017 -0800

--
 .../scheduler/OutputCommitCoordinator.scala  | 19 +++
 .../scheduler/OutputCommitCoordinatorSuite.scala | 16 
 2 files changed, 31 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/33791a8c/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 7bed685..08d220b 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable
 import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, 
RpcEnv}
+import org.apache.spark.util.{RpcUtils, ThreadUtils}
 
 private sealed trait OutputCommitCoordinationMessage extends Serializable
 
@@ -88,7 +89,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, 
isDriver: Boolean)
 val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
 coordinatorRef match {
   case Some(endpointRef) =>
-endpointRef.askWithRetry[Boolean](msg)
+ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg),
+  RpcUtils.askRpcTimeout(conf).duration)
   case None =>
 logError(
   "canCommit called after coordinator was stopped (is SparkEnv 
shutdown in progress)?")
@@ -165,9 +167,18 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
 authorizedCommitters(partition) = attemptNumber
 true
   case existingCommitter =>
-logDebug(s"Denying attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
-  s"partition=$partition; existingCommitter = $existingCommitter")
-false
+// Coordinator should be idempotent when receiving 
AskPermissionToCommit.
+if (existingCommitter == attemptNumber) {
+  logWarning(s"Authorizing duplicate request to commit for " +
+s"attemptNumber=$attemptNumber to commit for stage=$stage," +
+s" partition=$partition; existingCommitter = 
$existingCommitter." +
+s" This can indicate dropped network traffic.")
+  true
+} else {
+  logDebug(s"Denying attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
+s"partition=$partition; existingCommitter = 
$existingCommitter")
+  false
+}
 }
   case None =>
 logDebug(s"Stage $stage has completed, so not allowing attempt number 
$attemptNumber of" +

http://git-wip-us.apache.org/repos/asf/spark/blob/33791a8c/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 
b/core/src/test/scala/org/apache/spark/schedu

spark git commit: [SPARK-19231][SPARKR] add error handling for download and untar for Spark release

2017-01-18 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master d06172b88 -> 278fa1eb3


[SPARK-19231][SPARKR] add error handling for download and untar for Spark 
release

## What changes were proposed in this pull request?

When R is starting as a package and it needs to download the Spark release 
distribution we need to handle error for download and untar, and clean up, 
otherwise it will get stuck.

## How was this patch tested?

manually

Author: Felix Cheung 

Closes #16589 from felixcheung/rtarreturncode.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/278fa1eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/278fa1eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/278fa1eb

Branch: refs/heads/master
Commit: 278fa1eb305220a85c816c948932d6af8fa619aa
Parents: d06172b
Author: Felix Cheung 
Authored: Wed Jan 18 09:53:14 2017 -0800
Committer: Felix Cheung 
Committed: Wed Jan 18 09:53:14 2017 -0800

--
 R/pkg/R/install.R | 55 --
 1 file changed, 40 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/278fa1eb/R/pkg/R/install.R
--
diff --git a/R/pkg/R/install.R b/R/pkg/R/install.R
index cb6bbe5..72386e6 100644
--- a/R/pkg/R/install.R
+++ b/R/pkg/R/install.R
@@ -54,7 +54,7 @@
 #' }
 #' @param overwrite If \code{TRUE}, download and overwrite the existing tar 
file in localDir
 #'  and force re-install Spark (in case the local directory or 
file is corrupted)
-#' @return \code{install.spark} returns the local directory where Spark is 
found or installed
+#' @return the (invisible) local directory where Spark is found or installed
 #' @rdname install.spark
 #' @name install.spark
 #' @aliases install.spark
@@ -115,17 +115,35 @@ install.spark <- function(hadoopVersion = "2.7", 
mirrorUrl = NULL,
   } else {
 if (releaseUrl != "") {
   message("Downloading from alternate URL:\n- ", releaseUrl)
-  downloadUrl(releaseUrl, packageLocalPath, paste0("Fetch failed from ", 
releaseUrl))
+  success <- downloadUrl(releaseUrl, packageLocalPath)
+  if (!success) {
+unlink(packageLocalPath)
+stop(paste0("Fetch failed from ", releaseUrl))
+  }
 } else {
   robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, 
packageLocalPath)
 }
   }
 
   message(sprintf("Installing to %s", localDir))
-  untar(tarfile = packageLocalPath, exdir = localDir)
-  if (!tarExists || overwrite) {
+  # There are two ways untar can fail - untar could stop() on errors like 
incomplete block on file
+  # or, tar command can return failure code
+  success <- tryCatch(untar(tarfile = packageLocalPath, exdir = localDir) == 0,
+ error = function(e) {
+   message(e)
+   message()
+   FALSE
+ },
+ warning = function(w) {
+   # Treat warning as error, add an empty line with 
message()
+   message(w)
+   message()
+   FALSE
+ })
+  if (!tarExists || overwrite || !success) {
 unlink(packageLocalPath)
   }
+  if (!success) stop("Extract archive failed.")
   message("DONE.")
   Sys.setenv(SPARK_HOME = packageLocalDir)
   message(paste("SPARK_HOME set to", packageLocalDir))
@@ -135,8 +153,7 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl 
= NULL,
 robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, 
packageLocalPath) {
   # step 1: use user-provided url
   if (!is.null(mirrorUrl)) {
-msg <- sprintf("Use user-provided mirror site: %s.", mirrorUrl)
-message(msg)
+message("Use user-provided mirror site: ", mirrorUrl)
 success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
packageName, packageLocalPath)
 if (success) {
@@ -156,7 +173,7 @@ robustDownloadTar <- function(mirrorUrl, version, 
hadoopVersion, packageName, pa
packageName, packageLocalPath)
 if (success) return()
   } else {
-message("Unable to find preferred mirror site.")
+message("Unable to download from preferred mirror site: ", mirrorUrl)
   }
 
   # step 3: use backup option
@@ -165,8 +182,11 @@ robustDownloadTar <- function(mirrorUrl, version, 
hadoopVersion, packageName, pa
   success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
  packageName, packageLocalPath)
   if (success) {
-return(packageLocalPath)
+return()
   } else {
+# remove any partially downloaded file
+unlink(packageLocalPath)
+message("Unable to downl

spark git commit: [SPARK-19231][SPARKR] add error handling for download and untar for Spark release

2017-01-18 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 29b954bba -> 77202a6c5


[SPARK-19231][SPARKR] add error handling for download and untar for Spark 
release

## What changes were proposed in this pull request?

When R is starting as a package and it needs to download the Spark release 
distribution we need to handle error for download and untar, and clean up, 
otherwise it will get stuck.

## How was this patch tested?

manually

Author: Felix Cheung 

Closes #16589 from felixcheung/rtarreturncode.

(cherry picked from commit 278fa1eb305220a85c816c948932d6af8fa619aa)
Signed-off-by: Felix Cheung 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77202a6c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77202a6c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77202a6c

Branch: refs/heads/branch-2.1
Commit: 77202a6c57e6ac2438cdb6bd232a187b6734fa2b
Parents: 29b954b
Author: Felix Cheung 
Authored: Wed Jan 18 09:53:14 2017 -0800
Committer: Felix Cheung 
Committed: Wed Jan 18 09:53:31 2017 -0800

--
 R/pkg/R/install.R | 55 --
 1 file changed, 40 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/77202a6c/R/pkg/R/install.R
--
diff --git a/R/pkg/R/install.R b/R/pkg/R/install.R
index cb6bbe5..72386e6 100644
--- a/R/pkg/R/install.R
+++ b/R/pkg/R/install.R
@@ -54,7 +54,7 @@
 #' }
 #' @param overwrite If \code{TRUE}, download and overwrite the existing tar 
file in localDir
 #'  and force re-install Spark (in case the local directory or 
file is corrupted)
-#' @return \code{install.spark} returns the local directory where Spark is 
found or installed
+#' @return the (invisible) local directory where Spark is found or installed
 #' @rdname install.spark
 #' @name install.spark
 #' @aliases install.spark
@@ -115,17 +115,35 @@ install.spark <- function(hadoopVersion = "2.7", 
mirrorUrl = NULL,
   } else {
 if (releaseUrl != "") {
   message("Downloading from alternate URL:\n- ", releaseUrl)
-  downloadUrl(releaseUrl, packageLocalPath, paste0("Fetch failed from ", 
releaseUrl))
+  success <- downloadUrl(releaseUrl, packageLocalPath)
+  if (!success) {
+unlink(packageLocalPath)
+stop(paste0("Fetch failed from ", releaseUrl))
+  }
 } else {
   robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, 
packageLocalPath)
 }
   }
 
   message(sprintf("Installing to %s", localDir))
-  untar(tarfile = packageLocalPath, exdir = localDir)
-  if (!tarExists || overwrite) {
+  # There are two ways untar can fail - untar could stop() on errors like 
incomplete block on file
+  # or, tar command can return failure code
+  success <- tryCatch(untar(tarfile = packageLocalPath, exdir = localDir) == 0,
+ error = function(e) {
+   message(e)
+   message()
+   FALSE
+ },
+ warning = function(w) {
+   # Treat warning as error, add an empty line with 
message()
+   message(w)
+   message()
+   FALSE
+ })
+  if (!tarExists || overwrite || !success) {
 unlink(packageLocalPath)
   }
+  if (!success) stop("Extract archive failed.")
   message("DONE.")
   Sys.setenv(SPARK_HOME = packageLocalDir)
   message(paste("SPARK_HOME set to", packageLocalDir))
@@ -135,8 +153,7 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl 
= NULL,
 robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, 
packageLocalPath) {
   # step 1: use user-provided url
   if (!is.null(mirrorUrl)) {
-msg <- sprintf("Use user-provided mirror site: %s.", mirrorUrl)
-message(msg)
+message("Use user-provided mirror site: ", mirrorUrl)
 success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
packageName, packageLocalPath)
 if (success) {
@@ -156,7 +173,7 @@ robustDownloadTar <- function(mirrorUrl, version, 
hadoopVersion, packageName, pa
packageName, packageLocalPath)
 if (success) return()
   } else {
-message("Unable to find preferred mirror site.")
+message("Unable to download from preferred mirror site: ", mirrorUrl)
   }
 
   # step 3: use backup option
@@ -165,8 +182,11 @@ robustDownloadTar <- function(mirrorUrl, version, 
hadoopVersion, packageName, pa
   success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
  packageName, packageLocalPath)
   if (success) {
-return(packageLocalPath)
+return()
   } els

spark git commit: [SPARK-19223][SQL][PYSPARK] Fix InputFileBlockHolder for datasources which are based on HadoopRDD or NewHadoopRDD

2017-01-18 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master f85f29608 -> d06172b88


[SPARK-19223][SQL][PYSPARK] Fix InputFileBlockHolder for datasources which are 
based on HadoopRDD or NewHadoopRDD

## What changes were proposed in this pull request?

For some datasources which are based on HadoopRDD or NewHadoopRDD, such as 
spark-xml, InputFileBlockHolder doesn't work with Python UDF.

The method to reproduce it is, running the following codes with `bin/pyspark 
--packages com.databricks:spark-xml_2.11:0.4.1`:

from pyspark.sql.functions import udf,input_file_name
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession

def filename(path):
return path

session = SparkSession.builder.appName('APP').getOrCreate()

session.udf.register('sameText', filename)
sameText = udf(filename, StringType())

df = session.read.format('xml').load('a.xml', rowTag='root').select('*', 
input_file_name().alias('file'))
df.select('file').show() # works
df.select(sameText(df['file'])).show()   # returns empty content

The issue is because in `HadoopRDD` and `NewHadoopRDD` we set the file block's 
info in `InputFileBlockHolder` before the returned iterator begins consuming. 
`InputFileBlockHolder` will record this info into thread local variable. When 
running Python UDF in batch, we set up another thread to consume the iterator 
from child plan's output rdd, so we can't read the info back in another thread.

To fix this, we have to set the info in `InputFileBlockHolder` after the 
iterator begins consuming. So the info can be read in correct thread.

## How was this patch tested?

Manual test with above example codes for spark-xml package on pyspark: 
`bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`.

Added pyspark test.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #16585 from viirya/fix-inputfileblock-hadooprdd.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d06172b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d06172b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d06172b8

Branch: refs/heads/master
Commit: d06172b88e61c0f79e3dea5703a17c6ae590f248
Parents: f85f296
Author: Liang-Chi Hsieh 
Authored: Wed Jan 18 23:06:44 2017 +0800
Committer: Wenchen Fan 
Committed: Wed Jan 18 23:06:44 2017 +0800

--
 .../apache/spark/rdd/InputFileBlockHolder.scala |  7 +++---
 python/pyspark/sql/tests.py | 24 
 2 files changed, 28 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d06172b8/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala 
b/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala
index 9ba476d..ff2f58d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala
@@ -41,9 +41,10 @@ private[spark] object InputFileBlockHolder {
* The thread variable for the name of the current file being read. This is 
used by
* the InputFileName function in Spark SQL.
*/
-  private[this] val inputBlock: ThreadLocal[FileBlock] = new 
ThreadLocal[FileBlock] {
-override protected def initialValue(): FileBlock = new FileBlock
-  }
+  private[this] val inputBlock: InheritableThreadLocal[FileBlock] =
+new InheritableThreadLocal[FileBlock] {
+  override protected def initialValue(): FileBlock = new FileBlock
+}
 
   /**
* Returns the holding file name or empty string if it is unknown.

http://git-wip-us.apache.org/repos/asf/spark/blob/d06172b8/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index a825028..73a5df6 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -435,6 +435,30 @@ class SQLTests(ReusedPySparkTestCase):
 row = 
self.spark.read.json(filePath).select(sourceFile(input_file_name())).first()
 self.assertTrue(row[0].find("people1.json") != -1)
 
+def test_udf_with_input_file_name_for_hadooprdd(self):
+from pyspark.sql.functions import udf, input_file_name
+from pyspark.sql.types import StringType
+
+def filename(path):
+return path
+
+sameText = udf(filename, StringType())
+
+rdd = self.sc.textFile('python/test_support/sql/people.json')
+df = self.spark.read.json(rdd).select(input_file_name().alias('file'))
+row = df.select(sameText(df['file'])).first()

spark git commit: [SPARK-19024][SQL] Implement new approach to write a permanent view

2017-01-18 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 17ce0b5b3 -> f85f29608


[SPARK-19024][SQL] Implement new approach to write a permanent view

## What changes were proposed in this pull request?

On CREATE/ALTER a view, it's no longer needed to generate a SQL text string 
from the LogicalPlan, instead we store the SQL query text、the output column 
names of the query plan, and current database to CatalogTable. Permanent views 
created by this approach can be resolved by current view resolution approach.

The main advantage includes:
1. If you update an underlying view, the current view also gets updated;
2. That gives us a change to get ride of SQL generation for operators.

Major changes of this PR:
1. Generate the view-specific properties(e.g. view default database, view query 
output column names) during permanent view creation and store them as 
properties in the CatalogTable;
2. Update the commands `CreateViewCommand` and `AlterViewAsCommand`, get rid of 
SQL generation from them.

## How was this patch tested?
Existing tests.

Author: jiangxingbo 

Closes #16613 from jiangxb1987/view-write-path.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f85f2960
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f85f2960
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f85f2960

Branch: refs/heads/master
Commit: f85f29608de801d7cacc779a77c8edaed8124acf
Parents: 17ce0b5
Author: jiangxingbo 
Authored: Wed Jan 18 19:13:01 2017 +0800
Committer: Wenchen Fan 
Committed: Wed Jan 18 19:13:01 2017 +0800

--
 .../spark/sql/catalyst/catalog/interface.scala  |  19 ---
 .../spark/sql/execution/command/views.scala | 164 +--
 .../org/apache/spark/sql/SQLQuerySuite.scala|  10 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala |  29 ++--
 .../spark/sql/hive/execution/SQLViewSuite.scala |  16 +-
 5 files changed, 146 insertions(+), 92 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f85f2960/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 2adccdd..80d3282 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -223,25 +223,6 @@ case class CatalogTable(
 )
   }
 
-  /**
-   * Insert/Update the view query output column names in `properties`.
-   */
-  def withQueryColumnNames(columns: Seq[String]): CatalogTable = {
-val props = new mutable.HashMap[String, String]
-if (columns.nonEmpty) {
-  props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString)
-  columns.zipWithIndex.foreach { case (colName, index) =>
-props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName)
-  }
-}
-
-// We can't use `filterKeys` here, as the map returned by `filterKeys` is 
not serializable,
-// while `CatalogTable` should be serializable.
-copy(properties = properties.filterNot { case (key, _) =>
-  key.startsWith(VIEW_QUERY_OUTPUT_PREFIX)
-} ++ props)
-  }
-
   /** Syntactic sugar to update a field in `storage`. */
   def withNewStorage(
   locationUri: Option[String] = storage.locationUri,

http://git-wip-us.apache.org/repos/asf/spark/blob/f85f2960/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 154141b..3da4bcf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.sql.execution.command
 
-import scala.util.control.NonFatal
+import scala.collection.mutable
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions.Alias
@@ -64,9 +64,9 @@ object PersistedView extends ViewType
 
 
 /**
- * Create or replace a view with given query plan. This command will convert 
the query plan to
- * canonicalized SQL s

spark git commit: [SPARK-18782][BUILD] Bump Hadoop 2.6 version to use Hadoop 2.6.5

2017-01-18 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master eefdf9f9d -> 17ce0b5b3


[SPARK-18782][BUILD] Bump Hadoop 2.6 version to use Hadoop 2.6.5

**What changes were proposed in this pull request?**

Use Hadoop 2.6.5 for the Hadoop 2.6 profile, I see a bunch of fixes including 
security ones in the release notes that we should pick up

**How was this patch tested?**

Running the unit tests now with IBM's SDK for Java and let's see what happens 
with OpenJDK in the community builder - expecting no trouble as it is only a 
minor release.

Author: Adam Roberts 

Closes #16616 from a-roberts/Hadoop265Bumper.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17ce0b5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17ce0b5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17ce0b5b

Branch: refs/heads/master
Commit: 17ce0b5b3f6a825fc77458bc8608cece1a6019c7
Parents: eefdf9f
Author: Adam Roberts 
Authored: Wed Jan 18 09:46:34 2017 +
Committer: Sean Owen 
Committed: Wed Jan 18 09:46:34 2017 +

--
 dev/deps/spark-deps-hadoop-2.6 | 30 +++---
 pom.xml|  2 +-
 2 files changed, 16 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/17ce0b5b/dev/deps/spark-deps-hadoop-2.6
--
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 7b9383d..fbd5d88 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -59,21 +59,21 @@ gson-2.2.4.jar
 guava-14.0.1.jar
 guice-3.0.jar
 guice-servlet-3.0.jar
-hadoop-annotations-2.6.4.jar
-hadoop-auth-2.6.4.jar
-hadoop-client-2.6.4.jar
-hadoop-common-2.6.4.jar
-hadoop-hdfs-2.6.4.jar
-hadoop-mapreduce-client-app-2.6.4.jar
-hadoop-mapreduce-client-common-2.6.4.jar
-hadoop-mapreduce-client-core-2.6.4.jar
-hadoop-mapreduce-client-jobclient-2.6.4.jar
-hadoop-mapreduce-client-shuffle-2.6.4.jar
-hadoop-yarn-api-2.6.4.jar
-hadoop-yarn-client-2.6.4.jar
-hadoop-yarn-common-2.6.4.jar
-hadoop-yarn-server-common-2.6.4.jar
-hadoop-yarn-server-web-proxy-2.6.4.jar
+hadoop-annotations-2.6.5.jar
+hadoop-auth-2.6.5.jar
+hadoop-client-2.6.5.jar
+hadoop-common-2.6.5.jar
+hadoop-hdfs-2.6.5.jar
+hadoop-mapreduce-client-app-2.6.5.jar
+hadoop-mapreduce-client-common-2.6.5.jar
+hadoop-mapreduce-client-core-2.6.5.jar
+hadoop-mapreduce-client-jobclient-2.6.5.jar
+hadoop-mapreduce-client-shuffle-2.6.5.jar
+hadoop-yarn-api-2.6.5.jar
+hadoop-yarn-client-2.6.5.jar
+hadoop-yarn-common-2.6.5.jar
+hadoop-yarn-server-common-2.6.5.jar
+hadoop-yarn-server-web-proxy-2.6.5.jar
 hk2-api-2.4.0-b34.jar
 hk2-locator-2.4.0-b34.jar
 hk2-utils-2.4.0-b34.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/17ce0b5b/pom.xml
--
diff --git a/pom.xml b/pom.xml
index ff37a88..fdc1917 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2571,7 +2571,7 @@
 
   hadoop-2.6
   
-2.6.4
+2.6.5
 0.9.3
 3.4.6
 2.6.0


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19227][SPARK-19251] remove unused imports and outdated comments

2017-01-18 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 4494cd971 -> eefdf9f9d


[SPARK-19227][SPARK-19251] remove unused imports and outdated comments

## What changes were proposed in this pull request?
remove ununsed imports and outdated comments, and fix some minor code style 
issue.

## How was this patch tested?
existing ut

Author: uncleGen 

Closes #16591 from uncleGen/SPARK-19227.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eefdf9f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eefdf9f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eefdf9f9

Branch: refs/heads/master
Commit: eefdf9f9dd8afde49ad7d4e230e2735eb817ab0a
Parents: 4494cd9
Author: uncleGen 
Authored: Wed Jan 18 09:44:32 2017 +
Committer: Sean Owen 
Committed: Wed Jan 18 09:44:32 2017 +

--
 .../scala/org/apache/spark/SecurityManager.scala   |  2 --
 .../main/scala/org/apache/spark/SparkContext.scala |  2 +-
 .../deploy/ExternalShuffleServiceSource.scala  |  2 +-
 .../org/apache/spark/deploy/SparkSubmit.scala  |  2 +-
 .../apache/spark/deploy/worker/ui/LogPage.scala|  2 --
 .../apache/spark/internal/config/ConfigEntry.scala | 17 +
 .../spark/internal/config/ConfigReader.scala   |  1 -
 .../scala/org/apache/spark/rpc/RpcTimeout.scala|  5 ++---
 .../org/apache/spark/scheduler/ResultTask.scala|  1 -
 .../apache/spark/scheduler/ShuffleMapTask.scala|  1 -
 .../scala/org/apache/spark/scheduler/Stage.scala   |  1 -
 .../scala/org/apache/spark/scheduler/Task.scala|  1 -
 .../org/apache/spark/serializer/Serializer.scala   |  1 -
 .../spark/serializer/SerializerManager.scala   |  1 -
 .../spark/status/api/v1/ExecutorListResource.scala |  2 +-
 .../org/apache/spark/storage/StorageLevel.scala|  2 +-
 .../apache/spark/util/random/RandomSampler.scala   |  1 -
 .../apache/spark/InternalAccumulatorSuite.scala|  1 -
 .../netty/NettyBlockTransferServiceSuite.scala |  1 -
 .../org/apache/spark/util/DistributionSuite.scala  |  4 
 .../spark/examples/ml/BinarizerExample.scala   |  2 +-
 .../spark/examples/sql/SparkSQLExample.scala   |  4 
 .../sql/streaming/StructuredNetworkWordCount.scala |  1 -
 .../apache/spark/sql/kafka010/KafkaSource.scala|  2 +-
 .../apache/spark/streaming/kafka010/KafkaRDD.scala |  2 +-
 .../spark/streaming/kafka010/KafkaUtils.scala  |  1 -
 .../examples/streaming/KinesisWordCountASL.scala   |  2 +-
 .../streaming/kinesis/KinesisCheckpointer.scala|  2 +-
 .../kinesis/KinesisCheckpointerSuite.scala |  3 +--
 .../launcher/SparkSubmitOptionParserSuite.java |  3 ---
 .../spark/ml/classification/Classifier.scala   |  2 +-
 .../spark/ml/classification/GBTClassifier.scala|  1 -
 .../spark/ml/source/libsvm/LibSVMRelation.scala|  2 +-
 .../org/apache/spark/repl/SparkCommandLine.scala   |  4 ++--
 .../org/apache/spark/repl/SparkILoopInit.scala |  2 --
 .../deploy/yarn/ApplicationMasterArguments.scala   |  2 --
 .../security/HadoopFSCredentialProviderSuite.scala |  3 +--
 .../expressions/MonotonicallyIncreasingID.scala|  1 -
 .../aggregate/ApproximatePercentile.scala  |  1 -
 .../expressions/aggregate/Percentile.scala |  1 -
 .../catalyst/expressions/aggregate/collect.scala   |  2 --
 .../sql/catalyst/expressions/jsonExpressions.scala |  1 -
 .../sql/catalyst/expressions/nullExpressions.scala |  2 +-
 .../spark/sql/catalyst/planning/patterns.scala |  4 
 .../spark/sql/catalyst/plans/joinTypes.scala   |  1 -
 .../plans/logical/EventTimeWatermark.scala |  2 +-
 .../apache/spark/sql/hive/MetastoreRelation.scala  |  1 -
 47 files changed, 25 insertions(+), 79 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eefdf9f9/core/src/main/scala/org/apache/spark/SecurityManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala 
b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 87fe563..9bdc509 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -26,11 +26,9 @@ import javax.net.ssl._
 import com.google.common.hash.HashCodes
 import com.google.common.io.Files
 import org.apache.hadoop.io.Text
-import org.apache.hadoop.security.Credentials
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
 import org.apache.spark.network.sasl.SecretKeyHolder
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/eefdf9f9/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff -