[GitHub] spark pull request #19861: [SPARK-22387][SQL] Propagate session configs to d...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19861#discussion_r157140363 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import java.util.regex.Pattern + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.v2.{DataSourceV2, SessionConfigSupport} + +private[sql] object DataSourceV2Utils extends Logging { + + /** + * Helper method that extracts and transforms session configs into k/v pairs, the k/v pairs will + * be used to create data source options. + * Only extract when `ds` implements [[SessionConfigSupport]], in this case we may fetch the + * specified key-prefix from `ds`, and extract session configs with config keys that start with + * `spark.datasource.$keyPrefix`. A session config `spark.datasource.$keyPrefix.xxx -> yyy` will + * be transformed into `xxx -> yyy`. + * + * @param ds a [[DataSourceV2]] object + * @param conf the session conf + * @return an immutable map that contains all the extracted and transformed k/v pairs. + */ + def extractSessionConfigs(ds: DataSourceV2, conf: SQLConf): Map[String, String] = ds match { +case cs: SessionConfigSupport => + val keyPrefix = cs.keyPrefix() + require(keyPrefix != null, "The data source config key prefix can't be null.") + + val pattern = Pattern.compile(s"^spark\\.datasource\\.$keyPrefix\\.(.*)") --- End diff -- nit: `(.*)` -> `(.+)`. Just to forbid some corner case like `spark.datasource.$keyPrefix.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19861 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84941/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19861 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19861 **[Test build #84941 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84941/testReport)** for PR 19861 at commit [`f7d5a4d`](https://github.com/apache/spark/commit/f7d5a4dfce26f2d8d79f8b2529b9676fdf03c917). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class DataSourceV2UtilsSuite extends SparkFunSuite ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19981#discussion_r157137789 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala --- @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { */ val cacheManager: CacheManager = new CacheManager + /** + * A status store to query SQL status/metrics of this Spark application, based on SQL-specific + * [[org.apache.spark.scheduler.SparkListenerEvent]]s. + */ + val statusStore: SQLAppStatusStore = { --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19987: [Test][WIP] add passing test.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19987 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84944/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19987: [Test][WIP] add passing test.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19987 **[Test build #84944 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84944/testReport)** for PR 19987 at commit [`6b6d3fb`](https://github.com/apache/spark/commit/6b6d3fbdd8359ba67addb2123e995df61dba095c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19987: [Test][WIP] add passing test.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19987 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19986: [Test][WIP] add failing test.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19986 **[Test build #84945 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84945/testReport)** for PR 19986 at commit [`93d29a8`](https://github.com/apache/spark/commit/93d29a82847a3c8d3abc9db9cbb31c0d43d69da0). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19986: [Test][WIP] add failing test.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19986 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84945/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19986: [Test][WIP] add failing test.
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19986 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/19981#discussion_r157136615 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala --- @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { */ val cacheManager: CacheManager = new CacheManager + /** + * A status store to query SQL status/metrics of this Spark application, based on SQL-specific + * [[org.apache.spark.scheduler.SparkListenerEvent]]s. + */ + val statusStore: SQLAppStatusStore = { --- End diff -- +1, this is behavior change. It was good to have `SharedState.listener` and check metrics from it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19981#discussion_r157136099 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala --- @@ -489,16 +501,17 @@ private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExe } -class SQLListenerMemoryLeakSuite extends SparkFunSuite { +class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite { - // TODO: this feature is not yet available in SQLAppStatusStore. - ignore("no memory leak") { + test("no memory leak") { quietly { val conf = new SparkConf() .setMaster("local") .setAppName("test") +.set(LIVE_ENTITY_UPDATE_PERIOD, 0L) // Update the UI data immediately .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly -.set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly +// TODO: this feature is not yet available in SQLAppStatusStore. +// .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly --- End diff -- instead of totally disable this test because of an unimplemented feature, I'd like to still run it, but a little slower. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19981#discussion_r157135896 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala --- @@ -142,286 +163,277 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest (id, accumulatorValue) }.toMap -bus.postToAll(SparkListenerSQLExecutionStart( +listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) -bus.postToAll(SparkListenerJobStart( +listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq( createStageInfo(0, 0), createStageInfo(1, 0) ), createProperties(executionId))) -bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0))) + listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0))) -assert(store.executionMetrics(0).isEmpty) +assert(statusStore.executionMetrics(executionId).isEmpty) -bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. -bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L +listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L -checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) -bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) ))) -checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) +checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 3)) // Retrying a stage should reset the metrics -bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) + listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) -bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)), (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) // Ignore the task end for the first attempt -bus.postToAll(SparkListenerTaskEnd( +listener.onTaskEnd(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 0, taskType = "", reason = null, createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), null)) -checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) // Finish two tasks -bus.postToAll(SparkListenerTaskEnd( +listener.onTaskEnd(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 1, taskType = "", reason = null, createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)), null)) -bus.postToAll(SparkListenerTaskEnd( +listener.onTaskEnd(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 1, taskType = "", reason = null, createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)), null)) -
[GitHub] spark pull request #19941: [SPARK-22753][SQL] Get rid of dataSource.writeAnd...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19941 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19981#discussion_r157135580 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala --- @@ -142,286 +163,277 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest (id, accumulatorValue) }.toMap -bus.postToAll(SparkListenerSQLExecutionStart( +listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) -bus.postToAll(SparkListenerJobStart( +listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq( createStageInfo(0, 0), createStageInfo(1, 0) ), createProperties(executionId))) -bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0))) + listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0))) -assert(store.executionMetrics(0).isEmpty) +assert(statusStore.executionMetrics(executionId).isEmpty) -bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. -bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L +listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L -checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) -bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) ))) -checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) +checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 3)) // Retrying a stage should reset the metrics -bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) + listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) -bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)), (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) // Ignore the task end for the first attempt -bus.postToAll(SparkListenerTaskEnd( +listener.onTaskEnd(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 0, taskType = "", reason = null, createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), null)) -checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) // Finish two tasks -bus.postToAll(SparkListenerTaskEnd( +listener.onTaskEnd(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 1, taskType = "", reason = null, createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)), null)) -bus.postToAll(SparkListenerTaskEnd( +listener.onTaskEnd(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 1, taskType = "", reason = null, createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)), null)) -
[GitHub] spark issue #19941: [SPARK-22753][SQL] Get rid of dataSource.writeAndRead
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19941 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19981#discussion_r157135495 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala --- @@ -142,286 +163,277 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest (id, accumulatorValue) }.toMap -bus.postToAll(SparkListenerSQLExecutionStart( +listener.onOtherEvent(SparkListenerSQLExecutionStart( --- End diff -- I prefer the testing style before #19681 , which just call the event handling methods of the listener, instead of indirectly using an intermedia reply bus. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r157135337 --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java --- @@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) { public static Trigger Once() { return OneTimeTrigger$.MODULE$; } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * @since 2.3.0 + */ + public static Trigger Continuous(long intervalMs) { +return ContinuousTrigger.apply(intervalMs); + } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + *import java.util.concurrent.TimeUnit + *df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) --- End diff -- `Trigger.Continuous(10, TimeUnit.SECONDS)` instead of `ProcessingTime.create(10, TimeUnit.SECONDS)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19985: [SPARK-22791] [SQL] [SS] Redact Output of Explain
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19985#discussion_r157135279 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -418,6 +418,35 @@ class StreamSuite extends StreamTest { assert(OutputMode.Update === InternalOutputModes.Update) } + override protected def sparkConf: SparkConf = super.sparkConf +.set("spark.redaction.string.regex", "file:/[\\w_]+") + + test("explain - redaction") { --- End diff -- This is just a SS example. I believe we have more such cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19981#discussion_r157135252 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala --- @@ -36,14 +36,23 @@ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.status.config._ +import org.apache.spark.status.config.LIVE_ENTITY_UPDATE_PERIOD import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} import org.apache.spark.util.kvstore.InMemoryStore -class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { + +class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { import testImplicits._ - override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L) + override def beforeAll(): Unit = { +super.beforeAll() +sparkContext.conf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L) --- End diff -- only set this config for this suite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19986: [Test][WIP] add failing test.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19986 **[Test build #84945 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84945/testReport)** for PR 19986 at commit [`93d29a8`](https://github.com/apache/spark/commit/93d29a82847a3c8d3abc9db9cbb31c0d43d69da0). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19985: [SPARK-22791] [SQL] [SS] Redact Output of Explain
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19985 cc @marmbrus @hvanhovell @zsxwing --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19987: [Test][WIP] add passing test.
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19987 **[Test build #84944 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84944/testReport)** for PR 19987 at commit [`6b6d3fb`](https://github.com/apache/spark/commit/6b6d3fbdd8359ba67addb2123e995df61dba095c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19985: [SPARK-22791] [SQL] [SS] Redact Output of Explain
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19985 **[Test build #84946 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84946/testReport)** for PR 19985 at commit [`8f44edf`](https://github.com/apache/spark/commit/8f44edf5a4edcc8f1c42331cf3ab9b694fb01925). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19981#discussion_r157135091 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala --- @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { */ val cacheManager: CacheManager = new CacheManager + /** + * A status store to query SQL status/metrics of this Spark application, based on SQL-specific + * [[org.apache.spark.scheduler.SparkListenerEvent]]s. + */ + val statusStore: SQLAppStatusStore = { --- End diff -- I just realized this is important. Previously we can use `SharedState.listener` to query the SQL status. `SharedState` is actually a semi-public interface, as it's marked as `Unstable` in `SparkSession.sharedState`. Sometimes for debugging I just type `spark.sharedState.listener.xxx` in Spark Shell and check some status, but it's impossible now after #19681 There might be some other people like me that love this ability, we should not just remove it for future UI discoverbility(I think it's just SQL and streaming, really not a big gain) cc @hvanhovell @viirya @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19987: [Test][WIP] add passing test.
GitHub user MrBago opened a pull request: https://github.com/apache/spark/pull/19987 [Test][WIP] add passing test. missing import in python/pyspark/ml/tests.py, verifying with CI. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MrBago/spark pass-pyspark-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19987.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19987 commit 6b6d3fbdd8359ba67addb2123e995df61dba095c Author: Bago AmirbekianDate: 2017-12-15T07:00:17Z add passing test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19985: [SPARK-22791] [SQL] [SS] Redact Output of Explain
GitHub user gatorsmile opened a pull request: https://github.com/apache/spark/pull/19985 [SPARK-22791] [SQL] [SS] Redact Output of Explain ## What changes were proposed in this pull request? When calling explain on a query, the output can contain sensitive information. We should provide an admin/user to redact such information. ``` == Physical Plan == *HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, count(1)#12L]) +- StateStoreSave [value#6], state info [ checkpoint = file:/private/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw90gn/T/temporary-91c6fac0-609f-4bc8-ad57-52c189f06797/state, runId = 05a4b3af-f02c-40f8-9ff9-a3e18bae496f, opId = 0, ver = 0, numPartitions = 5], Complete, 0 +- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#18L]) +- StateStoreRestore [value#6], state info [ checkpoint = file:/private/var/folders/vx/j0ydl5rn0gd9mgrh1pljnw90gn/T/temporary-91c6fac0-609f-4bc8-ad57-52c189f06797/state, runId = 05a4b3af-f02c-40f8-9ff9-a3e18bae496f, opId = 0, ver = 0, numPartitions = 5] +- *HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#18L]) +- Exchange hashpartitioning(value#6, 5) +- *HashAggregate(keys=[value#6], functions=[partial_count(1)], output=[value#6, count#18L]) +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6] +- *MapElements , obj#5: java.lang.String +- *DeserializeToObject value#30.toString, obj#4: java.lang.String +- LocalTableScan [value#30] ``` ## How was this patch tested? Added a test case You can merge this pull request into a Git repository by running: $ git pull https://github.com/gatorsmile/spark redactPlan Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19985.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19985 commit 8f44edf5a4edcc8f1c42331cf3ab9b694fb01925 Author: gatorsmileDate: 2017-12-15T06:56:32Z fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19986: [Test][WIP] add failing test.
GitHub user MrBago opened a pull request: https://github.com/apache/spark/pull/19986 [Test][WIP] add failing test. missing import in python/pyspark/ml/tests.py, verifying with CI. You can merge this pull request into a Git repository by running: $ git pull https://github.com/MrBago/spark fail-pyspark-tests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19986.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19986 commit 93d29a82847a3c8d3abc9db9cbb31c0d43d69da0 Author: Bago AmirbekianDate: 2017-12-15T07:00:17Z add failing test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suit...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19982 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19981#discussion_r157134291 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala --- @@ -25,21 +25,17 @@ import scala.collection.mutable.ArrayBuffer import com.fasterxml.jackson.databind.annotation.JsonDeserialize -import org.apache.spark.{JobExecutionStatus, SparkConf} -import org.apache.spark.scheduler.SparkListener -import org.apache.spark.status.AppStatusPlugin +import org.apache.spark.JobExecutionStatus import org.apache.spark.status.KVUtils.KVIndexParam -import org.apache.spark.ui.SparkUI -import org.apache.spark.util.Utils import org.apache.spark.util.kvstore.KVStore /** * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's * no state kept in this class, so it's ok to have multiple instances of it in an application. */ -private[sql] class SQLAppStatusStore( +class SQLAppStatusStore( --- End diff -- Following the existing convention, classes under the `execution` package are meant to be private and doesn't need the `private[sql]` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19981 **[Test build #84943 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84943/testReport)** for PR 19981 at commit [`88fdff2`](https://github.com/apache/spark/commit/88fdff29cc106fd583640f8026dafd278acc9ec9). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suite
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19982 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19946: [SPARK-22648] [Scheduler] Spark on Kubernetes - D...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19946#discussion_r157133964 --- Diff: docs/running-on-kubernetes.md --- @@ -0,0 +1,502 @@ +--- +layout: global +title: Running Spark on Kubernetes +--- +* This will become a table of contents (this text will be scraped). +{:toc} + +Spark can run on clusters managed by [Kubernetes](https://kubernetes.io). This feature makes use of native +Kubernetes scheduler that has been added to Spark. + +# Prerequisites + +* A runnable distribution of Spark 2.3 or above. +* A running Kubernetes cluster at version >= 1.6 with access configured to it using +[kubectl](https://kubernetes.io/docs/user-guide/prereqs/). If you do not already have a working Kubernetes cluster, +you may setup a test cluster on your local machine using +[minikube](https://kubernetes.io/docs/getting-started-guides/minikube/). + * We recommend using the latest release of minikube with the DNS addon enabled. +* You must have appropriate permissions to list, create, edit and delete +[pods](https://kubernetes.io/docs/user-guide/pods/) in your cluster. You can verify that you can list these resources +by running `kubectl auth can-ipods`. + * The service account credentials used by the driver pods must be allowed to create pods, services and configmaps. +* You must have [Kubernetes DNS](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/) configured in your cluster. + +# How it works + + + + + +spark-submit can be directly used to submit a Spark application to a Kubernetes cluster. +The submission mechanism works as follows: + +* Spark creates a Spark driver running within a [Kubernetes pod](https://kubernetes.io/docs/concepts/workloads/pods/pod/). +* The driver creates executors which are also running within Kubernetes pods and connects to them, and executes application code. +* When the application completes, the executor pods terminate and are cleaned up, but the driver pod persists +logs and remains in "completed" state in the Kubernetes API until it's eventually garbage collected or manually cleaned up. + +Note that in the completed state, the driver pod does *not* use any computational or memory resources. + +The driver and executor pod scheduling is handled by Kubernetes. It will be possible to affect Kubernetes scheduling +decisions for driver and executor pods using advanced primitives like +[node selectors](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#nodeselector) +and [node/pod affinities](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity) +in a future release. + +# Submitting Applications to Kubernetes + +## Docker Images + +Kubernetes requires users to supply images that can be deployed into containers within pods. The images are built to +be run in a container runtime environment that Kubernetes supports. Docker is a container runtime environment that is --- End diff -- I wonder if k8s can tell the container runtime from the image name? If it can, we can use `container.image` here, but otherwise, I guess we need another config like `xxx.container` to tell the container runtime and `xxx.${container}.image` to specify the image, e.g. `xxx.container=docker` and `xxx.docker.image=something`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suite
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19982 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84940/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suite
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19982 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suite
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19982 **[Test build #84940 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84940/testReport)** for PR 19982 at commit [`2671416`](https://github.com/apache/spark/commit/2671416688ca6275556602b2f1990cd4361b95e6). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19941: [SPARK-22753][SQL] Get rid of dataSource.writeAndRead
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19941 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84939/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19941: [SPARK-22753][SQL] Get rid of dataSource.writeAndRead
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19941 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19941: [SPARK-22753][SQL] Get rid of dataSource.writeAndRead
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19941 **[Test build #84939 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84939/testReport)** for PR 19941 at commit [`5221c7c`](https://github.com/apache/spark/commit/5221c7cf11dc0accfcd1205177d0332bca042ffc). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157130281 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala --- @@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.ui.SparkUI +import org.apache.spark.status.config._ import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} - +import org.apache.spark.util.kvstore.InMemoryStore class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { import testImplicits._ - import org.apache.spark.AccumulatorSuite.makeInfo + + override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L) --- End diff -- the spark context is shared for all test suites, we should only set this conf to 0 in this suite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19864: [SPARK-22673][SQL] InMemoryRelation should utilize exist...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19864 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84938/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157129866 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala --- @@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest (id, accumulatorValue) }.toMap -listener.onOtherEvent(SparkListenerSQLExecutionStart( +bus.postToAll(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) -val executionUIData = listener.executionIdToData(0) - -listener.onJobStart(SparkListenerJobStart( +bus.postToAll(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq( createStageInfo(0, 0), createStageInfo(1, 0) ), createProperties(executionId))) - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0))) -assert(listener.getExecutionMetrics(0).isEmpty) +assert(store.executionMetrics(0).isEmpty) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. -listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, -createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) // Retrying a stage should reset the metrics - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Ignore the task end for the first attempt -listener.onTaskEnd(SparkListenerTaskEnd( +bus.postToAll(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0), - createTaskMetrics(accumulatorUpdates.mapValues(_ * 100 + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), + null)) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0),
[GitHub] spark issue #19864: [SPARK-22673][SQL] InMemoryRelation should utilize exist...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19864 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19864: [SPARK-22673][SQL] InMemoryRelation should utilize exist...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19864 **[Test build #84938 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84938/testReport)** for PR 19864 at commit [`4b2fcb6`](https://github.com/apache/spark/commit/4b2fcb6a9fd61fe771ab323f41935541b0223bdf). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157129912 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala --- @@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest (id, accumulatorValue) }.toMap -listener.onOtherEvent(SparkListenerSQLExecutionStart( +bus.postToAll(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) -val executionUIData = listener.executionIdToData(0) - -listener.onJobStart(SparkListenerJobStart( +bus.postToAll(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq( createStageInfo(0, 0), createStageInfo(1, 0) ), createProperties(executionId))) - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0))) -assert(listener.getExecutionMetrics(0).isEmpty) +assert(store.executionMetrics(0).isEmpty) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. -listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, -createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) // Retrying a stage should reset the metrics - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Ignore the task end for the first attempt -listener.onTaskEnd(SparkListenerTaskEnd( +bus.postToAll(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0), - createTaskMetrics(accumulatorUpdates.mapValues(_ * 100 + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), + null)) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0),
[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19681#discussion_r157129757 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala --- @@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest (id, accumulatorValue) }.toMap -listener.onOtherEvent(SparkListenerSQLExecutionStart( +bus.postToAll(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) -val executionUIData = listener.executionIdToData(0) - -listener.onJobStart(SparkListenerJobStart( +bus.postToAll(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq( createStageInfo(0, 0), createStageInfo(1, 0) ), createProperties(executionId))) - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0))) -assert(listener.getExecutionMetrics(0).isEmpty) +assert(store.executionMetrics(0).isEmpty) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. -listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 0, -createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulators().map(makeInfo)) + (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) // Retrying a stage should reset the metrics - listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) +bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( +bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) - (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), - (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) + (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)), + (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) ))) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) // Ignore the task end for the first attempt -listener.onTaskEnd(SparkListenerTaskEnd( +bus.postToAll(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0), - createTaskMetrics(accumulatorUpdates.mapValues(_ * 100 + createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), + null)) -checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) +checkAnswer(store.executionMetrics(0),
[GitHub] spark issue #19949: [SPARK-22762][TEST] Basic tests for IfCoercion and CaseW...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19949 **[Test build #84942 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84942/testReport)** for PR 19949 at commit [`8d04720`](https://github.com/apache/spark/commit/8d0472034e9267f7a6758b2ee9527f226e8f823e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19811: [SPARK-18016][SQL] Code Generation: Constant Pool...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19811#discussion_r157127908 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala --- @@ -219,4 +219,31 @@ class GeneratedProjectionSuite extends SparkFunSuite { // - one is the mutableRow assert(globalVariables.length == 3) } + + test("SPARK-18016: generated projections on wider table requiring state compaction") { +val N = 6000 +val wideRow1 = new GenericInternalRow((0 until N).toArray[Any]) +val schema1 = StructType((1 to N).map(i => StructField("", IntegerType))) +val wideRow2 = new GenericInternalRow( + (0 until N).map(i => UTF8String.fromString(i.toString)).toArray[Any]) +val schema2 = StructType((1 to N).map(i => StructField("", StringType))) +val joined = new JoinedRow(wideRow1, wideRow2) +val joinedSchema = StructType(schema1 ++ schema2) +val nested = new JoinedRow(InternalRow(joined, joined), joined) +val nestedSchema = StructType( + Seq(StructField("", joinedSchema), StructField("", joinedSchema)) ++ joinedSchema) + +val safeProj = FromUnsafeProjection(nestedSchema) +val result = safeProj(nested) + +// test generated MutableProjection +val exprs = nestedSchema.fields.zipWithIndex.map { case (f, i) => + BoundReference(i, f.dataType, true) +} +val mutableProj = GenerateMutableProjection.generate(exprs) --- End diff -- yea, the original pr has such a test though, IIUC state compaction in `GenerateMutableProjection` never happens? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19977#discussion_r157125671 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -50,15 +51,23 @@ import org.apache.spark.unsafe.types.{ByteArray, UTF8String} """) case class Concat(children: Seq[Expression]) extends Expression with ImplicitCastInputTypes { - override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) - override def dataType: DataType = StringType + private lazy val isBinaryMode = children.nonEmpty && children.forall(_.dataType == BinaryType) --- End diff -- `pg` and `hive` have the same; ``` postgres=# create table t1(a bytea, b bytea, c varchar, d varchar); postgres=# create view v1 as select a || b || c || d from t1; postgres=# \d v1 View "public.view41_1" Column | Type | Modifiers --+--+--- ?column? | text | hive> create table t1(a binary, b binary, c text, d test); hive> create view v1 as select a || b || c || d from t1; hive> describe v1; _c0 string ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19884: [WIP][SPARK-22324][SQL][PYTHON] Upgrade Arrow to 0.8.0
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19884 Just for a refreshing reminder about Jenkins, I happened to check what we have in Jenkins roughly a month ago (just simply by printing out the versions within PySpark tests) in a specific machine: ``` PyPy - No Pandas Python 2.7 Pandas [0.16.0] Python 3.4 Pandas [0.19.2] ``` ``` PyPy - No PyArrow python 2.7 - No PyArrow Python 3.4 PyArrow [0.4.1] ``` I think we should also make sure which Python has the corresponding Pandas and PyArrow. Also, we dropped Pandas 0.19.2 per http://apache-spark-developers-list.1001551.n3.nabble.com/discuss-PySpark-Can-we-drop-support-old-Pandas-lt-0-19-2-or-what-version-should-we-support-td22834.html and https://github.com/apache/spark/pull/19607. I think each Python also should have Pandas 0.19.2 now if I haven't missed something. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r157124515 --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java --- @@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) { public static Trigger Once() { return OneTimeTrigger$.MODULE$; } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * @since 2.3.0 + */ + public static Trigger Continuous(long intervalMs) { +return ContinuousTrigger.apply(intervalMs); + } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + *import java.util.concurrent.TimeUnit + *df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.3.0 + */ + public static Trigger Continuous(long interval, TimeUnit timeUnit) { +return ContinuousTrigger.create(interval, timeUnit); + } + + /** + * (Scala-friendly) + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + *import scala.concurrent.duration._ + *df.writeStream.trigger(Trigger.Continuous(10.seconds)) + * }}} + * @since 2.2.0 + */ + public static Trigger Continuous(Duration interval) { +return ContinuousTrigger.apply(interval); + } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + *df.writeStream.trigger(Trigger.Continuous("10 seconds")) + * }}} + * @since 2.2.0 --- End diff -- 2.3.0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r157124505 --- Diff: sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java --- @@ -95,4 +96,57 @@ public static Trigger ProcessingTime(String interval) { public static Trigger Once() { return OneTimeTrigger$.MODULE$; } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * @since 2.3.0 + */ + public static Trigger Continuous(long intervalMs) { +return ContinuousTrigger.apply(intervalMs); + } + + /** + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + *import java.util.concurrent.TimeUnit + *df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.3.0 + */ + public static Trigger Continuous(long interval, TimeUnit timeUnit) { +return ContinuousTrigger.create(interval, timeUnit); + } + + /** + * (Scala-friendly) + * A trigger that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + * + * {{{ + *import scala.concurrent.duration._ + *df.writeStream.trigger(Trigger.Continuous(10.seconds)) + * }}} + * @since 2.2.0 --- End diff -- 2.3.0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r157124394 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1035,6 +1035,22 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE = +buildConf("spark.sql.streaming.continuous.executorQueueSize") +.internal() +.doc("The size (measured in number of rows) of the queue used in continuous execution to" + + " buffer the results of a ContinuousDataReader.") +.intConf --- End diff -- `longConf`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19984: [SPARK-22789] Map-only continuous processing exec...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19984#discussion_r157124263 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1035,6 +1035,22 @@ object SQLConf { .booleanConf .createWithDefault(true) + val CONTINUOUS_STREAMING_EXECUTOR_QUEUE_SIZE = +buildConf("spark.sql.streaming.continuous.executorQueueSize") +.internal() +.doc("The size (measured in number of rows) of the queue used in continuous execution to" + + " buffer the results of a ContinuousDataReader.") +.intConf +.createWithDefault(1024) + + val CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS = +buildConf("spark.sql.streaming.continuous.executorPollIntervalMs") + .internal() + .doc("The interval at which continuous execution readers will poll to check whether" + +" the epoch has advanced on the driver.") + .intConf --- End diff -- `timeConf`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19884: [WIP][SPARK-22324][SQL][PYTHON] Upgrade Arrow to 0.8.0
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/19884 @wesm Yes, I'd like to use it asap to verify this patch and to confirm the behavior of my PR #18754 for `DecimalType` support. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19861: [SPARK-22387][SQL] Propagate session configs to data sou...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19861 **[Test build #84941 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84941/testReport)** for PR 19861 at commit [`f7d5a4d`](https://github.com/apache/spark/commit/f7d5a4dfce26f2d8d79f8b2529b9676fdf03c917). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19977#discussion_r157122909 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -50,15 +51,23 @@ import org.apache.spark.unsafe.types.{ByteArray, UTF8String} """) case class Concat(children: Seq[Expression]) extends Expression with ImplicitCastInputTypes { - override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) - override def dataType: DataType = StringType + private lazy val isBinaryMode = children.nonEmpty && children.forall(_.dataType == BinaryType) + + override def inputTypes: Seq[AbstractDataType] = +Seq.fill(children.size)(if (isBinaryMode) BinaryType else StringType) + override def dataType: DataType = if (isBinaryMode) BinaryType else StringType --- End diff -- Is it ok to add a new option for this case only? If we keep adding new options for each case, options could blow up? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19865: [SPARK-22668][SQL] Ensure no global variables in ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19865#discussion_r157122860 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -217,6 +217,18 @@ class CodegenContext { splitExpressions(expressions = initCodes, funcName = "init", arguments = Nil) } + /** + * Return true if a given variable has been described as a global variable + */ + def isDeclaredMutableState(varName: String): Boolean = { --- End diff -- I think that it is a good way at caller side since this function is valid at debug and production environments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19949: [SPARK-22762][TEST] Basic tests for IfCoercion an...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19949#discussion_r157122725 --- Diff: sql/core/src/test/resources/sql-tests/inputs/typeCoercion/native/caseWhenCoercion.sql --- @@ -0,0 +1,200 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +CREATE TEMPORARY VIEW t AS SELECT 1; + +SELECT CASE WHEN true THEN cast(1 as tinyint) ELSE cast(2 as tinyint) END FROM t; --- End diff -- In the future, we might do it for supporting the actual TEMP table. Yeah, please get rid of `short`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19949: [SPARK-22762][TEST] Basic tests for IfCoercion and CaseW...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19949 LGTM except the comment about `short` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19977#discussion_r157122586 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -50,15 +51,23 @@ import org.apache.spark.unsafe.types.{ByteArray, UTF8String} """) case class Concat(children: Seq[Expression]) extends Expression with ImplicitCastInputTypes { - override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) - override def dataType: DataType = StringType + private lazy val isBinaryMode = children.nonEmpty && children.forall(_.dataType == BinaryType) --- End diff -- will check some patterns --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19977#discussion_r157122613 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -50,15 +51,23 @@ import org.apache.spark.unsafe.types.{ByteArray, UTF8String} """) case class Concat(children: Seq[Expression]) extends Expression with ImplicitCastInputTypes { - override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) - override def dataType: DataType = StringType + private lazy val isBinaryMode = children.nonEmpty && children.forall(_.dataType == BinaryType) + + override def inputTypes: Seq[AbstractDataType] = +Seq.fill(children.size)(if (isBinaryMode) BinaryType else StringType) + override def dataType: DataType = if (isBinaryMode) BinaryType else StringType --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19977#discussion_r157122520 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -50,15 +51,23 @@ import org.apache.spark.unsafe.types.{ByteArray, UTF8String} """) case class Concat(children: Seq[Expression]) extends Expression with ImplicitCastInputTypes { - override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) - override def dataType: DataType = StringType + private lazy val isBinaryMode = children.nonEmpty && children.forall(_.dataType == BinaryType) + + override def inputTypes: Seq[AbstractDataType] = +Seq.fill(children.size)(if (isBinaryMode) BinaryType else StringType) + override def dataType: DataType = if (isBinaryMode) BinaryType else StringType --- End diff -- Conf is needed for sure. We also need a `Migration Guide`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19977#discussion_r157122430 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -50,15 +51,23 @@ import org.apache.spark.unsafe.types.{ByteArray, UTF8String} """) case class Concat(children: Seq[Expression]) extends Expression with ImplicitCastInputTypes { - override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) - override def dataType: DataType = StringType + private lazy val isBinaryMode = children.nonEmpty && children.forall(_.dataType == BinaryType) --- End diff -- > If all inputs are binary, concat also outputs binary. Is this true in Hive and others? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suite
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19982 yea, sure. I have much bandwidth now :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suite
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19982 cc @maropu Feel free to submit a PR for adding SSB --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19946: [SPARK-22648] [Scheduler] Spark on Kubernetes - D...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19946#discussion_r157120443 --- Diff: docs/running-on-kubernetes.md --- @@ -0,0 +1,502 @@ +--- +layout: global +title: Running Spark on Kubernetes +--- +* This will become a table of contents (this text will be scraped). +{:toc} + +Spark can run on clusters managed by [Kubernetes](https://kubernetes.io). This feature makes use of native +Kubernetes scheduler that has been added to Spark. + +# Prerequisites + +* A runnable distribution of Spark 2.3 or above. +* A running Kubernetes cluster at version >= 1.6 with access configured to it using +[kubectl](https://kubernetes.io/docs/user-guide/prereqs/). If you do not already have a working Kubernetes cluster, +you may setup a test cluster on your local machine using +[minikube](https://kubernetes.io/docs/getting-started-guides/minikube/). + * We recommend using the latest release of minikube with the DNS addon enabled. +* You must have appropriate permissions to list, create, edit and delete +[pods](https://kubernetes.io/docs/user-guide/pods/) in your cluster. You can verify that you can list these resources +by running `kubectl auth can-ipods`. + * The service account credentials used by the driver pods must be allowed to create pods, services and configmaps. +* You must have [Kubernetes DNS](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/) configured in your cluster. + +# How it works + + + + + +spark-submit can be directly used to submit a Spark application to a Kubernetes cluster. +The submission mechanism works as follows: + +* Spark creates a Spark driver running within a [Kubernetes pod](https://kubernetes.io/docs/concepts/workloads/pods/pod/). +* The driver creates executors which are also running within Kubernetes pods and connects to them, and executes application code. +* When the application completes, the executor pods terminate and are cleaned up, but the driver pod persists +logs and remains in "completed" state in the Kubernetes API until it's eventually garbage collected or manually cleaned up. + +Note that in the completed state, the driver pod does *not* use any computational or memory resources. + +The driver and executor pod scheduling is handled by Kubernetes. It will be possible to affect Kubernetes scheduling +decisions for driver and executor pods using advanced primitives like +[node selectors](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#nodeselector) +and [node/pod affinities](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity) +in a future release. + +# Submitting Applications to Kubernetes + +## Docker Images + +Kubernetes requires users to supply images that can be deployed into containers within pods. The images are built to +be run in a container runtime environment that Kubernetes supports. Docker is a container runtime environment that is --- End diff -- Just my two cents, if we could foresee that we may add support to other containers, then we should consider using `container.image` instead of `docker.image` at the beginning, because rename a config is also considered a kind of behavior change, that commonly takes more effort to process. If it is not that case (we are satisfied to be only supporting docker containers), then it would be fine to just keep the `docker.image` phrase. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suite
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19982 **[Test build #84940 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84940/testReport)** for PR 19982 at commit [`2671416`](https://github.com/apache/spark/commit/2671416688ca6275556602b2f1990cd4361b95e6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19773: [SPARK-22546][SQL] Supporting for changing column dataTy...
Github user xuanyuanking commented on the issue: https://github.com/apache/spark/pull/19773 gental ping @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suit...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19982#discussion_r157119974 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodeGenerator} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils + +abstract class BenchmarkQueryTest extends QueryTest with SharedSQLContext with BeforeAndAfterAll { + + // When Utils.isTesting is true, the RuleExecutor will issue an exception when hitting + // the max iteration of analyzer/optimizer batches. + assert(Utils.isTesting, "spark.testing is not set to true") + + /** + * Drop all the tables + */ + protected override def afterAll(): Unit = { +try { + // For debugging dump some statistics about how much time was spent in various optimizer rules + logWarning(RuleExecutor.dumpTimeSpent()) --- End diff -- oh, ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suit...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19982#discussion_r157119941 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodeGenerator} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils + +abstract class BenchmarkQueryTest extends QueryTest with SharedSQLContext with BeforeAndAfterAll { + + // When Utils.isTesting is true, the RuleExecutor will issue an exception when hitting + // the max iteration of analyzer/optimizer batches. + assert(Utils.isTesting, "spark.testing is not set to true") + + /** + * Drop all the tables + */ + protected override def afterAll(): Unit = { +try { + // For debugging dump some statistics about how much time was spent in various optimizer rules + logWarning(RuleExecutor.dumpTimeSpent()) --- End diff -- This is just to give the overall picture how long each rule takes. I plan to submit another PR to track which rule takes an effect for a specific query and also record the time cost. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suit...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19982#discussion_r157119768 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodeGenerator} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.Utils + +abstract class BenchmarkQueryTest extends QueryTest with SharedSQLContext with BeforeAndAfterAll { + + // When Utils.isTesting is true, the RuleExecutor will issue an exception when hitting + // the max iteration of analyzer/optimizer batches. + assert(Utils.isTesting, "spark.testing is not set to true") + + /** + * Drop all the tables + */ + protected override def afterAll(): Unit = { +try { + // For debugging dump some statistics about how much time was spent in various optimizer rules + logWarning(RuleExecutor.dumpTimeSpent()) --- End diff -- If we do use logWarning, the messages will not be shown in the test log. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19941: [SPARK-22753][SQL] Get rid of dataSource.writeAndRead
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19941 LGTM pending Jenkins --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19941: [SPARK-22753][SQL] Get rid of dataSource.writeAndRead
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19941 **[Test build #84939 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84939/testReport)** for PR 19941 at commit [`5221c7c`](https://github.com/apache/spark/commit/5221c7cf11dc0accfcd1205177d0332bca042ffc). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19941: [SPARK-22753][SQL] Get rid of dataSource.writeAnd...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/19941#discussion_r157118985 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala --- @@ -67,8 +67,9 @@ case class InsertIntoDataSourceDirCommand( val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists try { - sparkSession.sessionState.executePlan(dataSource.planForWriting(saveMode, query)) - dataSource.writeAndRead(saveMode, query) --- End diff -- Revert done. Sorry, maybe I misunderstand your words of 'get rid of dataSource.writeAndRead'. Like you and Wenchen's discussion in https://github.com/apache/spark/pull/16481, shouldn't we make `writeAndRead` just return a BaseRelation without write to the destination? Thank you for your patient reply. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19350: [SPARK-22126][ML][WIP] Fix model-specific optimiz...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19350#discussion_r157118663 --- Diff: mllib/src/main/scala/org/apache/spark/ml/Estimator.scala --- @@ -82,5 +86,49 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage { paramMaps.map(fit(dataset, _)) } + /** + * (Java-specific) + */ + @Since("2.3.0") + def fit(dataset: Dataset[_], paramMaps: Array[ParamMap], +unpersistDatasetAfterFitting: Boolean, executionContext: ExecutionContext, +modelCallback: VoidFunction2[Model[_], Int]): Unit = { +// Fit models in a Future for training in parallel +val modelFutures = paramMaps.map { paramMap => + Future[Model[_]] { +fit(dataset, paramMap).asInstanceOf[Model[_]] --- End diff -- @MLnick I dicussed with @jkbradley @MrBago offline and here is the newest proposal https://docs.google.com/document/d/1xw5M4sp1e0eQie75yIt-r6-GTuD5vpFf_I6v-AFBM3M/edit?usp=sharing Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19864: [SPARK-22673][SQL] InMemoryRelation should utiliz...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/19864#discussion_r157118091 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala --- @@ -80,6 +80,14 @@ class CacheManager extends Logging { cachedData.isEmpty } + private def extractStatsOfPlanForCache(plan: LogicalPlan): Option[Statistics] = { +if (plan.stats.rowCount.isDefined) { --- End diff -- change the code here and file a JIRA in https://issues.apache.org/jira/browse/SPARK-22790 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19961: [SPARK-22496][SQL] thrift server adds operation l...
Github user ChenjunZou commented on a diff in the pull request: https://github.com/apache/spark/pull/19961#discussion_r157117909 --- Diff: sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java --- @@ -67,4 +68,16 @@ protected void setConfOverlay(MapconfOverlay) { this.confOverlay = confOverlay; } } + + protected void registerCurrentOperationLog() { +if (isOperationLogEnabled) { + if (operationLog == null) { +LOG.warn("Failed to get current OperationLog object of Operation: " + +getHandle().getHandleIdentifier()); --- End diff -- thanks @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19041 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84935/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19041 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19041: [SPARK-21097][CORE] Add option to recover cached data
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19041 **[Test build #84935 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84935/testReport)** for PR 19041 at commit [`036fea4`](https://github.com/apache/spark/commit/036fea44be5bbd3ab0d33b11a98ab17962cb91fa). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19864: [SPARK-22673][SQL] InMemoryRelation should utilize exist...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19864 **[Test build #84938 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84938/testReport)** for PR 19864 at commit [`4b2fcb6`](https://github.com/apache/spark/commit/4b2fcb6a9fd61fe771ab323f41935541b0223bdf). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19946: [SPARK-22648] [Scheduler] Spark on Kubernetes - Document...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19946 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19946: [SPARK-22648] [Scheduler] Spark on Kubernetes - Document...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19946 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84934/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19946: [SPARK-22648] [Scheduler] Spark on Kubernetes - Document...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19946 **[Test build #84934 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84934/testReport)** for PR 19946 at commit [`873f04d`](https://github.com/apache/spark/commit/873f04dc3c1a4c48cbeac5766aec7c23c810c0cb). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19977#discussion_r157116687 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -50,15 +51,23 @@ import org.apache.spark.unsafe.types.{ByteArray, UTF8String} """) case class Concat(children: Seq[Expression]) extends Expression with ImplicitCastInputTypes { - override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) - override def dataType: DataType = StringType + private lazy val isBinaryMode = children.nonEmpty && children.forall(_.dataType == BinaryType) + + override def inputTypes: Seq[AbstractDataType] = +Seq.fill(children.size)(if (isBinaryMode) BinaryType else StringType) + override def dataType: DataType = if (isBinaryMode) BinaryType else StringType --- End diff -- yea, should be. Any existing option for keeping back compatibility? One option I think is to use a new hive type coercion option? https://issues.apache.org/jira/browse/SPARK-22722 Or, how about adding a new option? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suite
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19982 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suite
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19982 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84936/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suite
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19982 **[Test build #84936 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84936/testReport)** for PR 19982 at commit [`9dc4457`](https://github.com/apache/spark/commit/9dc445739016ce4168523840b65c439ddb54a99b). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `abstract class BenchmarkQueryTest extends QueryTest with SharedSQLContext with BeforeAndAfterAll ` * `class TPCDSQuerySuite extends BenchmarkQueryTest ` * `class TPCHQuerySuite extends BenchmarkQueryTest ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19751: [SPARK-20653][core] Add cleaning of old elements from th...
Github user gengliangwang commented on the issue: https://github.com/apache/spark/pull/19751 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16486: [SPARK-13610][ML] Create a Transformer to disasse...
Github user leonfl closed the pull request at: https://github.com/apache/spark/pull/16486 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19975: [SPARK-22781][SS] Support creating streaming dataset wit...
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/19975 Also, @brkyvz . Could you review this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19904: [SPARK-22707][ML] Optimize CrossValidator memory occupat...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19904 I discussed with @MrBago offline, I make a summary for what I thought now: I give 3 approaches which we can compare, after discussion I realized none of them is ideal, we have to make a trade-off. ## Approach 1 The approach proposed by @MrBago at https://github.com/apache/spark/pull/19904#discussion_r156751569 This approach resolve the model objects referenced issue, allow the model objects to be GCed in time. **BUT, in some cases, it still do not resolve the O(N) model memory occupation issue**. Let me use an extreme case to describe it: suppose we set `parallelism = 1`, and there're 100 paramMaps. So we have 100 fitting & evaluation tasks. In this approach, because of `parallelism = 1`, the code have to wait 100 fitting tasks complete, **(at this time the memory occupation by models already reach 100 * sizeof(model) )** and then it will unpersist training dataset and then do 100 evaluation tasks. ## Approach 2 This approach is my current PR code. This approach can make sure at any case, the peak memory occupation by models to be `O(numParallelism * sizeof(model))`, but, it exists an issue that, in some extreme case, the "unpersist training dataset" will be delayed until most of the evaluation tasks complete. Suppose the case `parallelism = 1`, and there're 100 fitting & evaluation tasks, each fitting task have to be executed one by one, so only after the first 99 fitting tasks and the 100th fitting task complete, the "unpersist training dataset" will be triggered. ## Approach 3 After I compared approach 1 and approach 2, I realized that, in the case which parallelism is low but there're many fitting & evaluation tasks, we cannot achieve both of the following two goals: - Make the peak memory occupation by models to be O(parallelism * sizeof(model)) - unpersist training dataset before most of the evaluation tasks started. So I vote for a simpler approach, move the unpersist training dataset to the end (Does this really matters ?) Because the goal 1 is more important, we must make sure the peak memory occupation by models to be O(parallelism * sizeof(model)), otherwise it will bring high risk of OOM. Like following code: ``` val foldMetricFutures = epm.zipWithIndex.map { case (paramMap, paramIndex) => Future[Double] { val model = est.fit(trainingDataset, paramMap).asInstanceOf[Model[_]] //...other minor codes val metric = eval.evaluate(model.transform(validationDataset, paramMap)) logDebug(s"Got metric metricformodeltrainedwithparamMap.") metric } (executionContext) } val foldMetrics = foldMetricFutures.map(ThreadUtils.awaitResult(_, Duration.Inf)) trainingDataset.unpersist() // <--- unpersist at the end validationDataset.unpersist() ``` Gentle ping @jkbradley @MrBago @sethah @BryanCutler @holdenk --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19977: [SPARK-22771][SQL] Concatenate binary inputs into...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19977#discussion_r157113499 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -50,15 +51,23 @@ import org.apache.spark.unsafe.types.{ByteArray, UTF8String} """) case class Concat(children: Seq[Expression]) extends Expression with ImplicitCastInputTypes { - override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.size)(StringType) - override def dataType: DataType = StringType + private lazy val isBinaryMode = children.nonEmpty && children.forall(_.dataType == BinaryType) + + override def inputTypes: Seq[AbstractDataType] = +Seq.fill(children.size)(if (isBinaryMode) BinaryType else StringType) + override def dataType: DataType = if (isBinaryMode) BinaryType else StringType --- End diff -- Shouldn't we worry about backward compatibility? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19982: [SPARK-22787] [TEST] [SQL] Add a TPC-H query suit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19982#discussion_r157113164 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/TPCHQuerySuite.scala --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.catalyst.util.resourceToString +import org.apache.spark.util.Utils + +/** + * This test suite ensures all the TPC-H queries can be successfully analyzed, optimized + * and compiled without hitting the max iteration threshold. + */ +class TPCHQuerySuite extends BenchmarkQueryTest { + + /** + * Drop all the tables + */ + protected override def afterAll(): Unit = { --- End diff -- yea we don't need to overwrite it here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19848: [SPARK-22162] Executors and the driver should use consis...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19848 I found Spark SQL always use 0 as job id... How hadoop committers work with job id? only for recovery? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19977: [SPARK-22771][SQL] Concatenate binary inputs into a bina...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19977 checked; ``` hive> create table t1(a string, b string); hive> create view v1 as select a || b from t1; hive> describe v1; OK _c0 string hive> create table t2(a binary, b binary); hive> create view v2 as select a || b from t2; hive> describe v2; _c0 binary ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r157111981 --- Diff: core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala --- @@ -60,17 +60,17 @@ object SparkHadoopWriter extends Logging { config: HadoopWriteConfigUtil[K, V]): Unit = { // Extract context and configuration from RDD. val sparkContext = rdd.context -val stageId = rdd.id +val commitJobId = rdd.id // Set up a job. val jobTrackerId = createJobTrackerID(new Date()) --- End diff -- `jobTrackerId` is also not unique, is that OK? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19811 For some types rarely seen, I think we don't need to make them in array for now because we might have just few elements there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org