spark git commit: [SPARK-16968][SQL][BACKPORT-2.0] Add additional options in jdbc when creating a new table
Repository: spark Updated Branches: refs/heads/branch-2.0 ee4e8faff -> 9fc053c30 [SPARK-16968][SQL][BACKPORT-2.0] Add additional options in jdbc when creating a new table ### What changes were proposed in this pull request? This PR is to backport the PRs https://github.com/apache/spark/pull/14559 and https://github.com/apache/spark/pull/14683 --- In the PR, we just allow the user to add additional options when create a new table in JDBC writer. The options can be table_options or partition_options. E.g., "CREATE TABLE t (name string) ENGINE=InnoDB DEFAULT CHARSET=utf8" Here is the usage example: ``` df.write.option("createTableOptions", "ENGINE=InnoDB DEFAULT CHARSET=utf8").jdbc(...) ``` ### How was this patch tested? Added a test case. Author: gatorsmile Closes #16634 from gatorsmile/backportSPARK-16968. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fc053c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fc053c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fc053c3 Branch: refs/heads/branch-2.0 Commit: 9fc053c30ae3670a1bcacacf33838750ddaca676 Parents: ee4e8fa Author: GraceH Authored: Wed Jan 18 20:58:31 2017 -0800 Committer: gatorsmile Committed: Wed Jan 18 20:58:31 2017 -0800 -- docs/sql-programming-guide.md | 7 ++ .../org/apache/spark/sql/DataFrameWriter.scala | 24 ++-- .../datasources/jdbc/JDBCOptions.scala | 17 +- .../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 10 4 files changed, 50 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9fc053c3/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index d38aed1..274310f 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1023,6 +1023,13 @@ the Data Sources API. The following options are supported: The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). + + +createTableOptions + + This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table. For example: CREATE TABLE t (name string) ENGINE=InnoDB. + + http://git-wip-us.apache.org/repos/asf/spark/blob/9fc053c3/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index a4c4a5d..3cad7df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, DataSource, HadoopFsRelation} -import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils +import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} /** * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems, @@ -399,6 +399,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertNotPartitioned("jdbc") assertNotBucketed("jdbc") +// to add required options like URL and dbtable +val params = extraOptions.toMap ++ Map("url" -> url, "dbtable" -> table) +val jdbcOptions = new JDBCOptions(params) +val jdbcUrl = jdbcOptions.url +val jdbcTable = jdbcOptions.table + val props = new Properties() extraOptions.foreach { case (key, value) => props.put(key, value) @@ -408,25 +414,29 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val conn = JdbcUtils.createConnectionFactory(url, props)() try { - var tableExists = JdbcUtils.tableExists(conn, url, table) + var tableExists = JdbcUtils.tableExists(conn, jdbcUrl, jdbcTable) if (mode == SaveMode.Ignore && tableExists) { return } if (mode == SaveMode.ErrorIfExists && tableExists) { -sys.error(s"Table $table already exists.") +sys.error(s"Table $jdbcTable already exists.") } if (mode == SaveMode.Overwrite && tableExists) { -JdbcUtils.dropTable(conn, table) +JdbcUtils.dropTable(conn, jdbcTable) tab
spark git commit: Update known_translations for contributor names
Repository: spark Updated Branches: refs/heads/master fe409f31d -> 0c9231858 Update known_translations for contributor names ## What changes were proposed in this pull request? Update known_translations per https://github.com/apache/spark/pull/16423#issuecomment-269739634 Author: Yin Huai Closes #16628 from yhuai/known_translations. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c923185 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c923185 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c923185 Branch: refs/heads/master Commit: 0c9231858866eff16f97df073d22811176fb6b36 Parents: fe409f3 Author: Yin Huai Authored: Wed Jan 18 18:18:51 2017 -0800 Committer: Yin Huai Committed: Wed Jan 18 18:18:51 2017 -0800 -- dev/create-release/known_translations | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0c923185/dev/create-release/known_translations -- diff --git a/dev/create-release/known_translations b/dev/create-release/known_translations index 0f30990..87bf2f2 100644 --- a/dev/create-release/known_translations +++ b/dev/create-release/known_translations @@ -177,7 +177,7 @@ anabranch - Bill Chambers ashangit - Nicolas Fraison avulanov - Alexander Ulanov biglobster - Liang Ke -cenyuhai - Cen Yu Hai +cenyuhai - Yuhai Cen codlife - Jianfei Wang david-weiluo-ren - Weiluo (David) Ren dding3 - Ding Ding @@ -198,7 +198,8 @@ petermaxlee - Peter Lee phalodi - Sandeep Purohit pkch - pkch priyankagargnitk - Priyanka Garg -sharkdtu - Sharkd Tu +sharkdtu - Xiaogang Tu shenh062326 - Shen Hong aokolnychyi - Anton Okolnychyi linbojin - Linbo Jin +lw-lin - Liwei Lin - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-14975][ML] Fixed GBTClassifier to predict probability per training instance and fixed interfaces
Repository: spark Updated Branches: refs/heads/master a81e336f1 -> fe409f31d [SPARK-14975][ML] Fixed GBTClassifier to predict probability per training instance and fixed interfaces ## What changes were proposed in this pull request? For all of the classifiers in MLLib we can predict probabilities except for GBTClassifier. Also, all classifiers inherit from ProbabilisticClassifier but GBTClassifier strangely inherits from Predictor, which is a bug. This change corrects the interface and adds the ability for the classifier to give a probabilities vector. ## How was this patch tested? The basic ML tests were run after making the changes. I've marked this as WIP as I need to add more tests. Author: Ilya Matiach Closes #16441 from imatiach-msft/ilmat/fix-GBT. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe409f31 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe409f31 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe409f31 Branch: refs/heads/master Commit: fe409f31d966d99fcf57137581d1fb682c1c072a Parents: a81e336 Author: Ilya Matiach Authored: Wed Jan 18 15:33:41 2017 -0800 Committer: Joseph K. Bradley Committed: Wed Jan 18 15:33:41 2017 -0800 -- .../spark/ml/classification/GBTClassifier.scala | 94 --- .../org/apache/spark/ml/tree/treeParams.scala | 4 +- .../apache/spark/mllib/tree/loss/LogLoss.scala | 10 +- .../org/apache/spark/mllib/tree/loss/Loss.scala | 8 +- .../ml/classification/GBTClassifierSuite.scala | 161 ++- 5 files changed, 248 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fe409f31/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala index c9bbd37..ade0960 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala @@ -23,9 +23,8 @@ import org.json4s.JsonDSL._ import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging -import org.apache.spark.ml.{PredictionModel, Predictor} import org.apache.spark.ml.feature.LabeledPoint -import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.regression.DecisionTreeRegressionModel import org.apache.spark.ml.tree._ @@ -33,6 +32,7 @@ import org.apache.spark.ml.tree.impl.GradientBoostedTrees import org.apache.spark.ml.util._ import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} +import org.apache.spark.mllib.tree.loss.LogLoss import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTModel} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} @@ -58,7 +58,7 @@ import org.apache.spark.sql.functions._ @Since("1.4.0") class GBTClassifier @Since("1.4.0") ( @Since("1.4.0") override val uid: String) - extends Predictor[Vector, GBTClassifier, GBTClassificationModel] + extends ProbabilisticClassifier[Vector, GBTClassifier, GBTClassificationModel] with GBTClassifierParams with DefaultParamsWritable with Logging { @Since("1.4.0") @@ -158,12 +158,19 @@ class GBTClassifier @Since("1.4.0") ( val numFeatures = oldDataset.first().features.size val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Classification) +val numClasses = 2 +if (isDefined(thresholds)) { + require($(thresholds).length == numClasses, this.getClass.getSimpleName + +".train() called with non-matching numClasses and thresholds.length." + +s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") +} + val instr = Instrumentation.create(this, oldDataset) instr.logParams(labelCol, featuresCol, predictionCol, impurity, lossType, maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode, seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval) instr.logNumFeatures(numFeatures) -instr.logNumClasses(2) +instr.logNumClasses(numClasses) val (baseLearners, learnerWeights) = GradientBoostedTrees.run(oldDataset, boostingStrategy, $(seed)) @@ -202,8 +209,9 @@ class GBTClassificationModel private[ml]( @Since("1.6.0") override val uid: String, private val _trees: Array[DecisionTreeRegressionModel], private val _treeWeights: Array[Double], -
spark git commit: [SPARK-19182][DSTREAM] Optimize the lock in StreamingJobProgressListener to not block UI when generating Streaming jobs
Repository: spark Updated Branches: refs/heads/master 569e50680 -> a81e336f1 [SPARK-19182][DSTREAM] Optimize the lock in StreamingJobProgressListener to not block UI when generating Streaming jobs ## What changes were proposed in this pull request? When DStreamGraph is generating a job, it will hold a lock and block other APIs. Because StreamingJobProgressListener (numInactiveReceivers, streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to access some information, the UI may hang if generating a job is very slow (e.g., talking to the slow Kafka cluster to fetch metadata). It's better to optimize the locks in DStreamGraph and StreamingJobProgressListener to make the UI not block by job generation. ## How was this patch tested? existing ut cc zsxwing Author: uncleGen Closes #16601 from uncleGen/SPARK-19182. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a81e336f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a81e336f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a81e336f Branch: refs/heads/master Commit: a81e336f1eddc2c6245d807aae2c81ddc60eabf9 Parents: 569e506 Author: uncleGen Authored: Wed Jan 18 10:55:31 2017 -0800 Committer: Shixiong Zhu Committed: Wed Jan 18 10:55:31 2017 -0800 -- .../org/apache/spark/streaming/DStreamGraph.scala | 13 + .../streaming/ui/StreamingJobProgressListener.scala| 8 2 files changed, 13 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a81e336f/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 54d736e..dce2028 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() + @volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil + var rememberDuration: Duration = null var checkpointInProgress = false var zeroTime: Time = null var startTime: Time = null var batchDuration: Duration = null + @volatile private var numReceivers: Int = 0 def start(time: Time) { this.synchronized { @@ -45,7 +48,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { startTime = time outputStreams.foreach(_.initialize(zeroTime)) outputStreams.foreach(_.remember(rememberDuration)) - outputStreams.foreach(_.validateAtStart) + outputStreams.foreach(_.validateAtStart()) + numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]]) + inputStreamNameAndID = inputStreams.map(is => (is.name, is.id)) inputStreams.par.foreach(_.start()) } } @@ -106,9 +111,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { .toArray } - def getInputStreamName(streamId: Int): Option[String] = synchronized { -inputStreams.find(_.id == streamId).map(_.name) - } + def getNumReceivers: Int = numReceivers + + def getInputStreamNameAndID: Seq[(String, Int)] = inputStreamNameAndID def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) http://git-wip-us.apache.org/repos/asf/spark/blob/a81e336f/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 95f5821..ed4c1e4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) } def numInactiveReceivers: Int = { -ssc.graph.getReceiverInputStreams().length - numActiveReceivers +ssc.graph.getNumReceivers - numActiveReceivers } def numTotalCompletedBatches: Long = synchronized { @@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) } def retainedCompletedBatches: Seq[BatchUIData] = synchronized { -completedBatchUIData.toSeq +completedBa
spark git commit: [SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon error
Repository: spark Updated Branches: refs/heads/master c050c1227 -> 569e50680 [SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon error ## What changes were proposed in this pull request? We should call `StateStore.abort()` when there should be any error before the store is committed. ## How was this patch tested? Manually. Author: Liwei Lin Closes #16547 from lw-lin/append-filter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/569e5068 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/569e5068 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/569e5068 Branch: refs/heads/master Commit: 569e50680f97b1ed054337a39fe198769ef52d93 Parents: c050c12 Author: Liwei Lin Authored: Wed Jan 18 10:52:47 2017 -0800 Committer: Shixiong Zhu Committed: Wed Jan 18 10:52:47 2017 -0800 -- .../spark/sql/execution/streaming/StatefulAggregate.scala| 8 .../streaming/state/HDFSBackedStateStoreProvider.scala | 2 +- .../spark/sql/execution/streaming/state/StateStore.scala | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala index 0551e4b..d4ccced 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType +import org.apache.spark.TaskContext /** Used to identify the state store for a given operator. */ @@ -150,6 +151,13 @@ case class StateStoreSaveExec( val numTotalStateRows = longMetric("numTotalStateRows") val numUpdatedStateRows = longMetric("numUpdatedStateRows") +// Abort the state store in case of error +TaskContext.get().addTaskCompletionListener(_ => { + if (!store.hasCommitted) { +store.abort() + } +}) + outputMode match { // Update and output all rows in the StateStore. case Some(Complete) => http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 4f3f818..1279b71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -203,7 +203,7 @@ private[state] class HDFSBackedStateStoreProvider( /** * Whether all updates have been committed */ -override private[state] def hasCommitted: Boolean = { +override private[streaming] def hasCommitted: Boolean = { state == COMMITTED } http://git-wip-us.apache.org/repos/asf/spark/blob/569e5068/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 9bc6c0e..d59746f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -83,7 +83,7 @@ trait StateStore { /** * Whether all updates have been committed */ - private[state] def hasCommitted: Boolean + private[streaming] def hasCommitted: Boolean } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon error
Repository: spark Updated Branches: refs/heads/branch-2.1 047506bae -> 4cff0b504 [SPARK-19168][STRUCTURED STREAMING] StateStore should be aborted upon error ## What changes were proposed in this pull request? We should call `StateStore.abort()` when there should be any error before the store is committed. ## How was this patch tested? Manually. Author: Liwei Lin Closes #16547 from lw-lin/append-filter. (cherry picked from commit 569e50680f97b1ed054337a39fe198769ef52d93) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4cff0b50 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4cff0b50 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4cff0b50 Branch: refs/heads/branch-2.1 Commit: 4cff0b504c367db314f10e730fe39dc083529f16 Parents: 047506b Author: Liwei Lin Authored: Wed Jan 18 10:52:47 2017 -0800 Committer: Shixiong Zhu Committed: Wed Jan 18 10:52:54 2017 -0800 -- .../spark/sql/execution/streaming/StatefulAggregate.scala| 8 .../streaming/state/HDFSBackedStateStoreProvider.scala | 2 +- .../spark/sql/execution/streaming/state/StateStore.scala | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4cff0b50/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala index 0551e4b..d4ccced 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType +import org.apache.spark.TaskContext /** Used to identify the state store for a given operator. */ @@ -150,6 +151,13 @@ case class StateStoreSaveExec( val numTotalStateRows = longMetric("numTotalStateRows") val numUpdatedStateRows = longMetric("numUpdatedStateRows") +// Abort the state store in case of error +TaskContext.get().addTaskCompletionListener(_ => { + if (!store.hasCommitted) { +store.abort() + } +}) + outputMode match { // Update and output all rows in the StateStore. case Some(Complete) => http://git-wip-us.apache.org/repos/asf/spark/blob/4cff0b50/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 4f3f818..1279b71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -203,7 +203,7 @@ private[state] class HDFSBackedStateStoreProvider( /** * Whether all updates have been committed */ -override private[state] def hasCommitted: Boolean = { +override private[streaming] def hasCommitted: Boolean = { state == COMMITTED } http://git-wip-us.apache.org/repos/asf/spark/blob/4cff0b50/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 9bc6c0e..d59746f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -83,7 +83,7 @@ trait StateStore { /** * Whether all updates have been committed */ - private[state] def hasCommitted: Boolean + private[streaming] def hasCommitted: Boolean } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19113][SS][TESTS] Ignore StreamingQueryException thrown from awaitInitialization to avoid breaking tests
Repository: spark Updated Branches: refs/heads/branch-2.1 77202a6c5 -> 047506bae [SPARK-19113][SS][TESTS] Ignore StreamingQueryException thrown from awaitInitialization to avoid breaking tests ## What changes were proposed in this pull request? #16492 missed one race condition: `StreamExecution.awaitInitialization` may throw fatal errors and fail the test. This PR just ignores `StreamingQueryException` thrown from `awaitInitialization` so that we can verify the exception in the `ExpectFailure` action later. It's fine since `StopStream` or `ExpectFailure` will catch `StreamingQueryException` as well. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16567 from zsxwing/SPARK-19113-2. (cherry picked from commit c050c12274fba2ac4c4938c4724049a47fa59280) Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/047506ba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/047506ba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/047506ba Branch: refs/heads/branch-2.1 Commit: 047506bae4f9a3505ac886ba04969d8d11f5 Parents: 77202a6 Author: Shixiong Zhu Authored: Wed Jan 18 10:50:51 2017 -0800 Committer: Shixiong Zhu Committed: Wed Jan 18 10:51:00 2017 -0800 -- .../scala/org/apache/spark/sql/streaming/StreamTest.scala | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/047506ba/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 4aa4100..af2f31a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -385,7 +385,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { .streamingQuery // Wait until the initialization finishes, because some tests need to use `logicalPlan` // after starting the query. -currentStream.awaitInitialization(streamingTimeout.toMillis) +try { + currentStream.awaitInitialization(streamingTimeout.toMillis) +} catch { + case _: StreamingQueryException => +// Ignore the exception. `StopStream` or `ExpectFailure` will catch it as well. +} case AdvanceManualClock(timeToAdd) => verify(currentStream != null, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19113][SS][TESTS] Ignore StreamingQueryException thrown from awaitInitialization to avoid breaking tests
Repository: spark Updated Branches: refs/heads/master 33791a8ce -> c050c1227 [SPARK-19113][SS][TESTS] Ignore StreamingQueryException thrown from awaitInitialization to avoid breaking tests ## What changes were proposed in this pull request? #16492 missed one race condition: `StreamExecution.awaitInitialization` may throw fatal errors and fail the test. This PR just ignores `StreamingQueryException` thrown from `awaitInitialization` so that we can verify the exception in the `ExpectFailure` action later. It's fine since `StopStream` or `ExpectFailure` will catch `StreamingQueryException` as well. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16567 from zsxwing/SPARK-19113-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c050c122 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c050c122 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c050c122 Branch: refs/heads/master Commit: c050c12274fba2ac4c4938c4724049a47fa59280 Parents: 33791a8 Author: Shixiong Zhu Authored: Wed Jan 18 10:50:51 2017 -0800 Committer: Shixiong Zhu Committed: Wed Jan 18 10:50:51 2017 -0800 -- .../scala/org/apache/spark/sql/streaming/StreamTest.scala | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c050c122/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 4aa4100..af2f31a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -385,7 +385,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { .streamingQuery // Wait until the initialization finishes, because some tests need to use `logicalPlan` // after starting the query. -currentStream.awaitInitialization(streamingTimeout.toMillis) +try { + currentStream.awaitInitialization(streamingTimeout.toMillis) +} catch { + case _: StreamingQueryException => +// Ignore the exception. `StopStream` or `ExpectFailure` will catch it as well. +} case AdvanceManualClock(timeToAdd) => verify(currentStream != null, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18113] Use ask to replace askWithRetry in canCommit and make receiver idempotent.
Repository: spark Updated Branches: refs/heads/master 278fa1eb3 -> 33791a8ce [SPARK-18113] Use ask to replace askWithRetry in canCommit and make receiver idempotent. ## What changes were proposed in this pull request? Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If timeout, it will send again. Thus AskPermissionToCommitOutput can be received multi times. Method canCommit should return the same value when called by the same attempt multi times. In implementation before this fix, method handleAskPermissionToCommit just check if there is committer already registered, which is not enough. When worker retries AskPermissionToCommitOutput it will get CommitDeniedException, then the task will fail with reason TaskCommitDenied, which is not regarded as a task failure(SPARK-11178), so TaskScheduler will schedule this task infinitely. In this fix, use `ask` to replace `askWithRetry` in `canCommit` and make receiver idempotent. ## How was this patch tested? Added a new unit test to OutputCommitCoordinatorSuite. Author: jinxing Closes #16503 from jinxing64/SPARK-18113. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33791a8c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33791a8c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33791a8c Branch: refs/heads/master Commit: 33791a8ced61d1ffa09f68033d240f874fdb1593 Parents: 278fa1e Author: jinxing Authored: Wed Jan 18 10:47:22 2017 -0800 Committer: Marcelo Vanzin Committed: Wed Jan 18 10:47:22 2017 -0800 -- .../scheduler/OutputCommitCoordinator.scala | 19 +++ .../scheduler/OutputCommitCoordinatorSuite.scala | 16 2 files changed, 31 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/33791a8c/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index 7bed685..08d220b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -22,6 +22,7 @@ import scala.collection.mutable import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} +import org.apache.spark.util.{RpcUtils, ThreadUtils} private sealed trait OutputCommitCoordinationMessage extends Serializable @@ -88,7 +89,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber) coordinatorRef match { case Some(endpointRef) => -endpointRef.askWithRetry[Boolean](msg) +ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg), + RpcUtils.askRpcTimeout(conf).duration) case None => logError( "canCommit called after coordinator was stopped (is SparkEnv shutdown in progress)?") @@ -165,9 +167,18 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean) authorizedCommitters(partition) = attemptNumber true case existingCommitter => -logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " + - s"partition=$partition; existingCommitter = $existingCommitter") -false +// Coordinator should be idempotent when receiving AskPermissionToCommit. +if (existingCommitter == attemptNumber) { + logWarning(s"Authorizing duplicate request to commit for " + +s"attemptNumber=$attemptNumber to commit for stage=$stage," + +s" partition=$partition; existingCommitter = $existingCommitter." + +s" This can indicate dropped network traffic.") + true +} else { + logDebug(s"Denying attemptNumber=$attemptNumber to commit for stage=$stage, " + +s"partition=$partition; existingCommitter = $existingCommitter") + false +} } case None => logDebug(s"Stage $stage has completed, so not allowing attempt number $attemptNumber of" + http://git-wip-us.apache.org/repos/asf/spark/blob/33791a8c/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/schedu
spark git commit: [SPARK-19231][SPARKR] add error handling for download and untar for Spark release
Repository: spark Updated Branches: refs/heads/master d06172b88 -> 278fa1eb3 [SPARK-19231][SPARKR] add error handling for download and untar for Spark release ## What changes were proposed in this pull request? When R is starting as a package and it needs to download the Spark release distribution we need to handle error for download and untar, and clean up, otherwise it will get stuck. ## How was this patch tested? manually Author: Felix Cheung Closes #16589 from felixcheung/rtarreturncode. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/278fa1eb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/278fa1eb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/278fa1eb Branch: refs/heads/master Commit: 278fa1eb305220a85c816c948932d6af8fa619aa Parents: d06172b Author: Felix Cheung Authored: Wed Jan 18 09:53:14 2017 -0800 Committer: Felix Cheung Committed: Wed Jan 18 09:53:14 2017 -0800 -- R/pkg/R/install.R | 55 -- 1 file changed, 40 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/278fa1eb/R/pkg/R/install.R -- diff --git a/R/pkg/R/install.R b/R/pkg/R/install.R index cb6bbe5..72386e6 100644 --- a/R/pkg/R/install.R +++ b/R/pkg/R/install.R @@ -54,7 +54,7 @@ #' } #' @param overwrite If \code{TRUE}, download and overwrite the existing tar file in localDir #' and force re-install Spark (in case the local directory or file is corrupted) -#' @return \code{install.spark} returns the local directory where Spark is found or installed +#' @return the (invisible) local directory where Spark is found or installed #' @rdname install.spark #' @name install.spark #' @aliases install.spark @@ -115,17 +115,35 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, } else { if (releaseUrl != "") { message("Downloading from alternate URL:\n- ", releaseUrl) - downloadUrl(releaseUrl, packageLocalPath, paste0("Fetch failed from ", releaseUrl)) + success <- downloadUrl(releaseUrl, packageLocalPath) + if (!success) { +unlink(packageLocalPath) +stop(paste0("Fetch failed from ", releaseUrl)) + } } else { robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) } } message(sprintf("Installing to %s", localDir)) - untar(tarfile = packageLocalPath, exdir = localDir) - if (!tarExists || overwrite) { + # There are two ways untar can fail - untar could stop() on errors like incomplete block on file + # or, tar command can return failure code + success <- tryCatch(untar(tarfile = packageLocalPath, exdir = localDir) == 0, + error = function(e) { + message(e) + message() + FALSE + }, + warning = function(w) { + # Treat warning as error, add an empty line with message() + message(w) + message() + FALSE + }) + if (!tarExists || overwrite || !success) { unlink(packageLocalPath) } + if (!success) stop("Extract archive failed.") message("DONE.") Sys.setenv(SPARK_HOME = packageLocalDir) message(paste("SPARK_HOME set to", packageLocalDir)) @@ -135,8 +153,7 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) { # step 1: use user-provided url if (!is.null(mirrorUrl)) { -msg <- sprintf("Use user-provided mirror site: %s.", mirrorUrl) -message(msg) +message("Use user-provided mirror site: ", mirrorUrl) success <- directDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) if (success) { @@ -156,7 +173,7 @@ robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa packageName, packageLocalPath) if (success) return() } else { -message("Unable to find preferred mirror site.") +message("Unable to download from preferred mirror site: ", mirrorUrl) } # step 3: use backup option @@ -165,8 +182,11 @@ robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa success <- directDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) if (success) { -return(packageLocalPath) +return() } else { +# remove any partially downloaded file +unlink(packageLocalPath) +message("Unable to downl
spark git commit: [SPARK-19231][SPARKR] add error handling for download and untar for Spark release
Repository: spark Updated Branches: refs/heads/branch-2.1 29b954bba -> 77202a6c5 [SPARK-19231][SPARKR] add error handling for download and untar for Spark release ## What changes were proposed in this pull request? When R is starting as a package and it needs to download the Spark release distribution we need to handle error for download and untar, and clean up, otherwise it will get stuck. ## How was this patch tested? manually Author: Felix Cheung Closes #16589 from felixcheung/rtarreturncode. (cherry picked from commit 278fa1eb305220a85c816c948932d6af8fa619aa) Signed-off-by: Felix Cheung Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77202a6c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77202a6c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77202a6c Branch: refs/heads/branch-2.1 Commit: 77202a6c57e6ac2438cdb6bd232a187b6734fa2b Parents: 29b954b Author: Felix Cheung Authored: Wed Jan 18 09:53:14 2017 -0800 Committer: Felix Cheung Committed: Wed Jan 18 09:53:31 2017 -0800 -- R/pkg/R/install.R | 55 -- 1 file changed, 40 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/77202a6c/R/pkg/R/install.R -- diff --git a/R/pkg/R/install.R b/R/pkg/R/install.R index cb6bbe5..72386e6 100644 --- a/R/pkg/R/install.R +++ b/R/pkg/R/install.R @@ -54,7 +54,7 @@ #' } #' @param overwrite If \code{TRUE}, download and overwrite the existing tar file in localDir #' and force re-install Spark (in case the local directory or file is corrupted) -#' @return \code{install.spark} returns the local directory where Spark is found or installed +#' @return the (invisible) local directory where Spark is found or installed #' @rdname install.spark #' @name install.spark #' @aliases install.spark @@ -115,17 +115,35 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, } else { if (releaseUrl != "") { message("Downloading from alternate URL:\n- ", releaseUrl) - downloadUrl(releaseUrl, packageLocalPath, paste0("Fetch failed from ", releaseUrl)) + success <- downloadUrl(releaseUrl, packageLocalPath) + if (!success) { +unlink(packageLocalPath) +stop(paste0("Fetch failed from ", releaseUrl)) + } } else { robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) } } message(sprintf("Installing to %s", localDir)) - untar(tarfile = packageLocalPath, exdir = localDir) - if (!tarExists || overwrite) { + # There are two ways untar can fail - untar could stop() on errors like incomplete block on file + # or, tar command can return failure code + success <- tryCatch(untar(tarfile = packageLocalPath, exdir = localDir) == 0, + error = function(e) { + message(e) + message() + FALSE + }, + warning = function(w) { + # Treat warning as error, add an empty line with message() + message(w) + message() + FALSE + }) + if (!tarExists || overwrite || !success) { unlink(packageLocalPath) } + if (!success) stop("Extract archive failed.") message("DONE.") Sys.setenv(SPARK_HOME = packageLocalDir) message(paste("SPARK_HOME set to", packageLocalDir)) @@ -135,8 +153,7 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) { # step 1: use user-provided url if (!is.null(mirrorUrl)) { -msg <- sprintf("Use user-provided mirror site: %s.", mirrorUrl) -message(msg) +message("Use user-provided mirror site: ", mirrorUrl) success <- directDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) if (success) { @@ -156,7 +173,7 @@ robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa packageName, packageLocalPath) if (success) return() } else { -message("Unable to find preferred mirror site.") +message("Unable to download from preferred mirror site: ", mirrorUrl) } # step 3: use backup option @@ -165,8 +182,11 @@ robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa success <- directDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) if (success) { -return(packageLocalPath) +return() } els
spark git commit: [SPARK-19223][SQL][PYSPARK] Fix InputFileBlockHolder for datasources which are based on HadoopRDD or NewHadoopRDD
Repository: spark Updated Branches: refs/heads/master f85f29608 -> d06172b88 [SPARK-19223][SQL][PYSPARK] Fix InputFileBlockHolder for datasources which are based on HadoopRDD or NewHadoopRDD ## What changes were proposed in this pull request? For some datasources which are based on HadoopRDD or NewHadoopRDD, such as spark-xml, InputFileBlockHolder doesn't work with Python UDF. The method to reproduce it is, running the following codes with `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`: from pyspark.sql.functions import udf,input_file_name from pyspark.sql.types import StringType from pyspark.sql import SparkSession def filename(path): return path session = SparkSession.builder.appName('APP').getOrCreate() session.udf.register('sameText', filename) sameText = udf(filename, StringType()) df = session.read.format('xml').load('a.xml', rowTag='root').select('*', input_file_name().alias('file')) df.select('file').show() # works df.select(sameText(df['file'])).show() # returns empty content The issue is because in `HadoopRDD` and `NewHadoopRDD` we set the file block's info in `InputFileBlockHolder` before the returned iterator begins consuming. `InputFileBlockHolder` will record this info into thread local variable. When running Python UDF in batch, we set up another thread to consume the iterator from child plan's output rdd, so we can't read the info back in another thread. To fix this, we have to set the info in `InputFileBlockHolder` after the iterator begins consuming. So the info can be read in correct thread. ## How was this patch tested? Manual test with above example codes for spark-xml package on pyspark: `bin/pyspark --packages com.databricks:spark-xml_2.11:0.4.1`. Added pyspark test. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh Closes #16585 from viirya/fix-inputfileblock-hadooprdd. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d06172b8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d06172b8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d06172b8 Branch: refs/heads/master Commit: d06172b88e61c0f79e3dea5703a17c6ae590f248 Parents: f85f296 Author: Liang-Chi Hsieh Authored: Wed Jan 18 23:06:44 2017 +0800 Committer: Wenchen Fan Committed: Wed Jan 18 23:06:44 2017 +0800 -- .../apache/spark/rdd/InputFileBlockHolder.scala | 7 +++--- python/pyspark/sql/tests.py | 24 2 files changed, 28 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d06172b8/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala b/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala index 9ba476d..ff2f58d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala +++ b/core/src/main/scala/org/apache/spark/rdd/InputFileBlockHolder.scala @@ -41,9 +41,10 @@ private[spark] object InputFileBlockHolder { * The thread variable for the name of the current file being read. This is used by * the InputFileName function in Spark SQL. */ - private[this] val inputBlock: ThreadLocal[FileBlock] = new ThreadLocal[FileBlock] { -override protected def initialValue(): FileBlock = new FileBlock - } + private[this] val inputBlock: InheritableThreadLocal[FileBlock] = +new InheritableThreadLocal[FileBlock] { + override protected def initialValue(): FileBlock = new FileBlock +} /** * Returns the holding file name or empty string if it is unknown. http://git-wip-us.apache.org/repos/asf/spark/blob/d06172b8/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a825028..73a5df6 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -435,6 +435,30 @@ class SQLTests(ReusedPySparkTestCase): row = self.spark.read.json(filePath).select(sourceFile(input_file_name())).first() self.assertTrue(row[0].find("people1.json") != -1) +def test_udf_with_input_file_name_for_hadooprdd(self): +from pyspark.sql.functions import udf, input_file_name +from pyspark.sql.types import StringType + +def filename(path): +return path + +sameText = udf(filename, StringType()) + +rdd = self.sc.textFile('python/test_support/sql/people.json') +df = self.spark.read.json(rdd).select(input_file_name().alias('file')) +row = df.select(sameText(df['file'])).first()
spark git commit: [SPARK-19024][SQL] Implement new approach to write a permanent view
Repository: spark Updated Branches: refs/heads/master 17ce0b5b3 -> f85f29608 [SPARK-19024][SQL] Implement new approach to write a permanent view ## What changes were proposed in this pull request? On CREATE/ALTER a view, it's no longer needed to generate a SQL text string from the LogicalPlan, instead we store the SQL query textãthe output column names of the query plan, and current database to CatalogTable. Permanent views created by this approach can be resolved by current view resolution approach. The main advantage includes: 1. If you update an underlying view, the current view also gets updated; 2. That gives us a change to get ride of SQL generation for operators. Major changes of this PR: 1. Generate the view-specific properties(e.g. view default database, view query output column names) during permanent view creation and store them as properties in the CatalogTable; 2. Update the commands `CreateViewCommand` and `AlterViewAsCommand`, get rid of SQL generation from them. ## How was this patch tested? Existing tests. Author: jiangxingbo Closes #16613 from jiangxb1987/view-write-path. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f85f2960 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f85f2960 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f85f2960 Branch: refs/heads/master Commit: f85f29608de801d7cacc779a77c8edaed8124acf Parents: 17ce0b5 Author: jiangxingbo Authored: Wed Jan 18 19:13:01 2017 +0800 Committer: Wenchen Fan Committed: Wed Jan 18 19:13:01 2017 +0800 -- .../spark/sql/catalyst/catalog/interface.scala | 19 --- .../spark/sql/execution/command/views.scala | 164 +-- .../org/apache/spark/sql/SQLQuerySuite.scala| 10 +- .../spark/sql/hive/execution/HiveDDLSuite.scala | 29 ++-- .../spark/sql/hive/execution/SQLViewSuite.scala | 16 +- 5 files changed, 146 insertions(+), 92 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f85f2960/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 2adccdd..80d3282 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -223,25 +223,6 @@ case class CatalogTable( ) } - /** - * Insert/Update the view query output column names in `properties`. - */ - def withQueryColumnNames(columns: Seq[String]): CatalogTable = { -val props = new mutable.HashMap[String, String] -if (columns.nonEmpty) { - props.put(VIEW_QUERY_OUTPUT_NUM_COLUMNS, columns.length.toString) - columns.zipWithIndex.foreach { case (colName, index) => -props.put(s"$VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX$index", colName) - } -} - -// We can't use `filterKeys` here, as the map returned by `filterKeys` is not serializable, -// while `CatalogTable` should be serializable. -copy(properties = properties.filterNot { case (key, _) => - key.startsWith(VIEW_QUERY_OUTPUT_PREFIX) -} ++ props) - } - /** Syntactic sugar to update a field in `storage`. */ def withNewStorage( locationUri: Option[String] = storage.locationUri, http://git-wip-us.apache.org/repos/asf/spark/blob/f85f2960/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 154141b..3da4bcf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.execution.command -import scala.util.control.NonFatal +import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, Row, SparkSession} -import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedFunction, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.Alias @@ -64,9 +64,9 @@ object PersistedView extends ViewType /** - * Create or replace a view with given query plan. This command will convert the query plan to - * canonicalized SQL s
spark git commit: [SPARK-18782][BUILD] Bump Hadoop 2.6 version to use Hadoop 2.6.5
Repository: spark Updated Branches: refs/heads/master eefdf9f9d -> 17ce0b5b3 [SPARK-18782][BUILD] Bump Hadoop 2.6 version to use Hadoop 2.6.5 **What changes were proposed in this pull request?** Use Hadoop 2.6.5 for the Hadoop 2.6 profile, I see a bunch of fixes including security ones in the release notes that we should pick up **How was this patch tested?** Running the unit tests now with IBM's SDK for Java and let's see what happens with OpenJDK in the community builder - expecting no trouble as it is only a minor release. Author: Adam Roberts Closes #16616 from a-roberts/Hadoop265Bumper. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17ce0b5b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17ce0b5b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17ce0b5b Branch: refs/heads/master Commit: 17ce0b5b3f6a825fc77458bc8608cece1a6019c7 Parents: eefdf9f Author: Adam Roberts Authored: Wed Jan 18 09:46:34 2017 + Committer: Sean Owen Committed: Wed Jan 18 09:46:34 2017 + -- dev/deps/spark-deps-hadoop-2.6 | 30 +++--- pom.xml| 2 +- 2 files changed, 16 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/17ce0b5b/dev/deps/spark-deps-hadoop-2.6 -- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 7b9383d..fbd5d88 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -59,21 +59,21 @@ gson-2.2.4.jar guava-14.0.1.jar guice-3.0.jar guice-servlet-3.0.jar -hadoop-annotations-2.6.4.jar -hadoop-auth-2.6.4.jar -hadoop-client-2.6.4.jar -hadoop-common-2.6.4.jar -hadoop-hdfs-2.6.4.jar -hadoop-mapreduce-client-app-2.6.4.jar -hadoop-mapreduce-client-common-2.6.4.jar -hadoop-mapreduce-client-core-2.6.4.jar -hadoop-mapreduce-client-jobclient-2.6.4.jar -hadoop-mapreduce-client-shuffle-2.6.4.jar -hadoop-yarn-api-2.6.4.jar -hadoop-yarn-client-2.6.4.jar -hadoop-yarn-common-2.6.4.jar -hadoop-yarn-server-common-2.6.4.jar -hadoop-yarn-server-web-proxy-2.6.4.jar +hadoop-annotations-2.6.5.jar +hadoop-auth-2.6.5.jar +hadoop-client-2.6.5.jar +hadoop-common-2.6.5.jar +hadoop-hdfs-2.6.5.jar +hadoop-mapreduce-client-app-2.6.5.jar +hadoop-mapreduce-client-common-2.6.5.jar +hadoop-mapreduce-client-core-2.6.5.jar +hadoop-mapreduce-client-jobclient-2.6.5.jar +hadoop-mapreduce-client-shuffle-2.6.5.jar +hadoop-yarn-api-2.6.5.jar +hadoop-yarn-client-2.6.5.jar +hadoop-yarn-common-2.6.5.jar +hadoop-yarn-server-common-2.6.5.jar +hadoop-yarn-server-web-proxy-2.6.5.jar hk2-api-2.4.0-b34.jar hk2-locator-2.4.0-b34.jar hk2-utils-2.4.0-b34.jar http://git-wip-us.apache.org/repos/asf/spark/blob/17ce0b5b/pom.xml -- diff --git a/pom.xml b/pom.xml index ff37a88..fdc1917 100644 --- a/pom.xml +++ b/pom.xml @@ -2571,7 +2571,7 @@ hadoop-2.6 -2.6.4 +2.6.5 0.9.3 3.4.6 2.6.0 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-19227][SPARK-19251] remove unused imports and outdated comments
Repository: spark Updated Branches: refs/heads/master 4494cd971 -> eefdf9f9d [SPARK-19227][SPARK-19251] remove unused imports and outdated comments ## What changes were proposed in this pull request? remove ununsed imports and outdated comments, and fix some minor code style issue. ## How was this patch tested? existing ut Author: uncleGen Closes #16591 from uncleGen/SPARK-19227. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eefdf9f9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eefdf9f9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eefdf9f9 Branch: refs/heads/master Commit: eefdf9f9dd8afde49ad7d4e230e2735eb817ab0a Parents: 4494cd9 Author: uncleGen Authored: Wed Jan 18 09:44:32 2017 + Committer: Sean Owen Committed: Wed Jan 18 09:44:32 2017 + -- .../scala/org/apache/spark/SecurityManager.scala | 2 -- .../main/scala/org/apache/spark/SparkContext.scala | 2 +- .../deploy/ExternalShuffleServiceSource.scala | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../apache/spark/deploy/worker/ui/LogPage.scala| 2 -- .../apache/spark/internal/config/ConfigEntry.scala | 17 + .../spark/internal/config/ConfigReader.scala | 1 - .../scala/org/apache/spark/rpc/RpcTimeout.scala| 5 ++--- .../org/apache/spark/scheduler/ResultTask.scala| 1 - .../apache/spark/scheduler/ShuffleMapTask.scala| 1 - .../scala/org/apache/spark/scheduler/Stage.scala | 1 - .../scala/org/apache/spark/scheduler/Task.scala| 1 - .../org/apache/spark/serializer/Serializer.scala | 1 - .../spark/serializer/SerializerManager.scala | 1 - .../spark/status/api/v1/ExecutorListResource.scala | 2 +- .../org/apache/spark/storage/StorageLevel.scala| 2 +- .../apache/spark/util/random/RandomSampler.scala | 1 - .../apache/spark/InternalAccumulatorSuite.scala| 1 - .../netty/NettyBlockTransferServiceSuite.scala | 1 - .../org/apache/spark/util/DistributionSuite.scala | 4 .../spark/examples/ml/BinarizerExample.scala | 2 +- .../spark/examples/sql/SparkSQLExample.scala | 4 .../sql/streaming/StructuredNetworkWordCount.scala | 1 - .../apache/spark/sql/kafka010/KafkaSource.scala| 2 +- .../apache/spark/streaming/kafka010/KafkaRDD.scala | 2 +- .../spark/streaming/kafka010/KafkaUtils.scala | 1 - .../examples/streaming/KinesisWordCountASL.scala | 2 +- .../streaming/kinesis/KinesisCheckpointer.scala| 2 +- .../kinesis/KinesisCheckpointerSuite.scala | 3 +-- .../launcher/SparkSubmitOptionParserSuite.java | 3 --- .../spark/ml/classification/Classifier.scala | 2 +- .../spark/ml/classification/GBTClassifier.scala| 1 - .../spark/ml/source/libsvm/LibSVMRelation.scala| 2 +- .../org/apache/spark/repl/SparkCommandLine.scala | 4 ++-- .../org/apache/spark/repl/SparkILoopInit.scala | 2 -- .../deploy/yarn/ApplicationMasterArguments.scala | 2 -- .../security/HadoopFSCredentialProviderSuite.scala | 3 +-- .../expressions/MonotonicallyIncreasingID.scala| 1 - .../aggregate/ApproximatePercentile.scala | 1 - .../expressions/aggregate/Percentile.scala | 1 - .../catalyst/expressions/aggregate/collect.scala | 2 -- .../sql/catalyst/expressions/jsonExpressions.scala | 1 - .../sql/catalyst/expressions/nullExpressions.scala | 2 +- .../spark/sql/catalyst/planning/patterns.scala | 4 .../spark/sql/catalyst/plans/joinTypes.scala | 1 - .../plans/logical/EventTimeWatermark.scala | 2 +- .../apache/spark/sql/hive/MetastoreRelation.scala | 1 - 47 files changed, 25 insertions(+), 79 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eefdf9f9/core/src/main/scala/org/apache/spark/SecurityManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 87fe563..9bdc509 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -26,11 +26,9 @@ import javax.net.ssl._ import com.google.common.hash.HashCodes import com.google.common.io.Files import org.apache.hadoop.io.Text -import org.apache.hadoop.security.Credentials import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ import org.apache.spark.network.sasl.SecretKeyHolder import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/eefdf9f9/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff -