[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21564 **[Test build #91856 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91856/testReport)** for PR 21564 at commit [`405ba94`](https://github.com/apache/spark/commit/405ba9441973a186569bbf733907bd9445331c34). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195542619 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) + } + + private class PollRunnable(applicationId: String) extends Runnable { +override def run(): Unit = { + snapshotsStore.replaceSnapshot(kubernetesClient --- End diff -- I see - I think what we actually want is `ExecutorPodsSnapshotStoreImpl` to initialize the subscriber with its current snapshot. That creates the semantics where the new subscriber will first receive the most up to date state immediately. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195542491 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -304,6 +305,11 @@ class SparkContext(config: SparkConf) extends Logging { _dagScheduler = ds } + private[spark] def heartbeater: Heartbeater = _heartbeater + private[spark] def heartbeater_=(hb: Heartbeater): Unit = { --- End diff -- These aren't used -- I'll remove. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195542018 --- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala --- @@ -98,14 +101,53 @@ class ExecutorSummary private[spark]( val removeReason: Option[String], val executorLogs: Map[String, String], val memoryMetrics: Option[MemoryMetrics], -val blacklistedInStages: Set[Int]) +val blacklistedInStages: Set[Int], +@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer]) +@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer]) +val peakMemoryMetrics: Option[Array[Long]]) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, val usedOffHeapStorageMemory: Long, val totalOnHeapStorageMemory: Long, val totalOffHeapStorageMemory: Long) +/** deserialzer for peakMemoryMetrics: convert to array ordered by metric name */ +class PeakMemoryMetricsDeserializer extends JsonDeserializer[Option[Array[Long]]] { + override def deserialize( + jsonParser: JsonParser, + deserializationContext: DeserializationContext): Option[Array[Long]] = { +val metricsMap = jsonParser.readValueAs(classOf[Option[Map[String, Object]]]) +metricsMap match { + case Some(metrics) => +Some(MetricGetter.values.map { m => + metrics.getOrElse (m.name, 0L) match { +case intVal: Int => intVal.toLong +case longVal: Long => longVal + } +}.toArray) + case None => None +} + } +} + +/** serializer for peakMemoryMetrics: convert array to map with metric name as key */ +class PeakMemoryMetricsSerializer extends JsonSerializer[Option[Array[Long]]] { + override def serialize( + metrics: Option[Array[Long]], + jsonGenerator: JsonGenerator, + serializerProvider: SerializerProvider): Unit = { +metrics match { + case Some(m) => +val metricsMap = (0 until MetricGetter.values.length).map { idx => --- End diff -- It's still being used in JsonProtocol.executorMetricsToJson -- let me know if you'd like me to convert that to use values instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195541136 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -234,8 +272,18 @@ private[spark] class EventLoggingListener( } } - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // For the active stages, record any new peak values for the memory metrics for the executor + event.executorUpdates.foreach { executorUpdates => +liveStageExecutorMetrics.values.foreach { peakExecutorMetrics => + val peakMetrics = peakExecutorMetrics.getOrElseUpdate( +event.execId, new PeakExecutorMetrics()) + peakMetrics.compareAndUpdate(executorUpdates) --- End diff -- What would be the right timestamp? Peaks for different metrics could have different timestamps. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21564 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21564 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91855/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21564 **[Test build #91855 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91855/testReport)** for PR 21564 at commit [`0ef99cc`](https://github.com/apache/spark/commit/0ef99cc972a54fd9c98338e54a7e4e6b9a213654). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21569: [SPARK-24563][PYTHON]Catch TypeError when testing existe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21569 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/152/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21569: [SPARK-24563][PYTHON]Catch TypeError when testing existe...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21569 **[Test build #91865 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91865/testReport)** for PR 21569 at commit [`fdc3e75`](https://github.com/apache/spark/commit/fdc3e759d5d936018fe79a799065476ab0b73ec9). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21569: [SPARK-24563][PYTHON]Catch TypeError when testing existe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21569 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21569: [SPARK-24563][PYTHON]Catch TypeError when testing existe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21569 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4043/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21569: [SPARK-24563][PYTHON]Catch TypeError when testing existe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21569 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195539837 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event --- End diff -- Tracking task start and end would be some amount of overhead. If it's a relatively unlikely corner case, and unlikely to have much impact on the numbers, it may be better to leave as is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195538848 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { +if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { +liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() + val executorMap = liveStageExecutorMetrics.remove( +(event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { +val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.metrics) --- End diff -- We need to pass in a value for timestamp, but there isn't really one for the peak metrics, since times for each peak could be different. When processing, -1 will help indicate that the event is coming from the history log, and contains the peak values for the stage that is just ending. When updating the stage executor peaks (peak executor values stored for each active stage), we can replace all of the peak executor metric values instead of updating with the max of current and new values for each metric. As an example, suppose there is the following scenario: T1: start of stage 1 T2: peak value of 1000 for metric m1 T3: start of stage 2 T4: stage 1 ends, and peak metric values for stage 1 are logged, including m1=1000 T5: stage 2 ends, and peak metric values for stage 2 are logged. If values for m1 are < 1000 between T3 (start of stage 2) and T5 (end of stage 2), and say that the highest value for m1 during that period is 500, then we want the peak value for m1 for stage 2 to show as 500. There would be an ExecutorMetricsUpdate event logged (and then read) at T4 (end of stage 1), with m1=1000, which is after T3 (start of stage 2). If when reading the history log, we set the stage 2 peakExecutorMetrics to the max of current or new values from ExecutorMetricsUpdate, then the value for stage 2 would remain at 1000. However, we want it to be replaced by the value of 500 when it gets the ExecutorMetricsUpdate logged at T5 (end of stage 2). During processing of ExecutorMetricsUpdate, for the stage level metrics, it will replace all the peakExecutorMetrics if timestamp is -1. I can add some comments for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21569: Catch TypeError when testing existence of HiveCon...
GitHub user icexelloss opened a pull request: https://github.com/apache/spark/pull/21569 Catch TypeError when testing existence of HiveConf when creating pysp⦠â¦ark shell ## What changes were proposed in this pull request? This PR catches TypeError when testing existence of HiveConf when creating pyspark shell ## How was this patch tested? Manually tested. Here are the manual test cases: Build with hive: ``` (pyarrow-dev) Lis-MacBook-Pro:spark icexelloss$ bin/pyspark Python 3.6.5 | packaged by conda-forge | (default, Apr 6 2018, 13:44:09) [GCC 4.2.1 Compatible Apple LLVM 6.1.0 (clang-602.0.53)] on darwin Type "help", "copyright", "credits" or "license" for more information. 18/06/14 14:55:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.4.0-SNAPSHOT /_/ Using Python version 3.6.5 (default, Apr 6 2018 13:44:09) SparkSession available as 'spark'. >>> spark.conf.get('spark.sql.catalogImplementation') 'hive' ``` Build without hive: ``` (pyarrow-dev) Lis-MacBook-Pro:spark icexelloss$ bin/pyspark Python 3.6.5 | packaged by conda-forge | (default, Apr 6 2018, 13:44:09) [GCC 4.2.1 Compatible Apple LLVM 6.1.0 (clang-602.0.53)] on darwin Type "help", "copyright", "credits" or "license" for more information. 18/06/14 15:04:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.4.0-SNAPSHOT /_/ Using Python version 3.6.5 (default, Apr 6 2018 13:44:09) SparkSession available as 'spark'. >>> spark.conf.get('spark.sql.catalogImplementation') 'in-memory' ``` Failed to start shell: ``` (pyarrow-dev) Lis-MacBook-Pro:spark icexelloss$ bin/pyspark Python 3.6.5 | packaged by conda-forge | (default, Apr 6 2018, 13:44:09) [GCC 4.2.1 Compatible Apple LLVM 6.1.0 (clang-602.0.53)] on darwin Type "help", "copyright", "credits" or "license" for more information. 18/06/14 15:07:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). /Users/icexelloss/workspace/spark/python/pyspark/shell.py:45: UserWarning: Failed to initialize Spark session. warnings.warn("Failed to initialize Spark session.") Traceback (most recent call last): File "/Users/icexelloss/workspace/spark/python/pyspark/shell.py", line 41, in spark = SparkSession._create_shell_session() File "/Users/icexelloss/workspace/spark/python/pyspark/sql/session.py", line 581, in _create_shell_session return SparkSession.builder.getOrCreate() File "/Users/icexelloss/workspace/spark/python/pyspark/sql/session.py", line 168, in getOrCreate raise py4j.protocol.Py4JError("Fake Py4JError") py4j.protocol.Py4JError: Fake Py4JError ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/icexelloss/spark SPARK-24563-fix-pyspark-shell-without-hive Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21569.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21569 commit 7d5b62858cd3fecbf149b0890d11b23c06356011 Author: Li Jin Date: 2018-06-14T18:52:20Z Catch TypeError when testing existence of HiveConf when creating pyspark shell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21527: [SPARK-24519] MapStatus has 2000 hardcoded
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21527 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91850/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21527: [SPARK-24519] MapStatus has 2000 hardcoded
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21527 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21527: [SPARK-24519] MapStatus has 2000 hardcoded
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21527 **[Test build #91850 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91850/testReport)** for PR 21527 at commit [`4c8acfa`](https://github.com/apache/spark/commit/4c8acfa5899ccbdafeb630f38ce44b23332b80f2). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195535296 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) + } + + private class PollRunnable(applicationId: String) extends Runnable { +override def run(): Unit = { + snapshotsStore.replaceSnapshot(kubernetesClient --- End diff -- Yes you need to trigger the initial creation of executors somehow and yes I saw that in the tests, my only concern is that this should be explicit not implicit to make code more obvious anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21559 **[Test build #91864 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91864/testReport)** for PR 21559 at commit [`25d6de1`](https://github.com/apache/spark/commit/25d6de1db8223975ebd9b69c7ca77c26e3d8674c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195534314 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -93,6 +96,9 @@ private[spark] class EventLoggingListener( // Visible for tests only. private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + // map of live stages, to peak executor metrics for the stage + private val liveStageExecutorMetrics = HashMap[(Int, Int), HashMap[String, PeakExecutorMetrics]]() --- End diff -- modified. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...
Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195533972 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -1751,7 +1753,7 @@ class DAGScheduler( messageScheduler.shutdownNow() eventProcessLoop.stop() taskScheduler.stop() - } + } --- End diff -- fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20636 **[Test build #91863 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91863/testReport)** for PR 20636 at commit [`aca2ee6`](https://github.com/apache/spark/commit/aca2ee645f368a74451f5147bb3662786120e0d1). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20636 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91863/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20636 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user foxish commented on the issue: https://github.com/apache/spark/pull/21366 If last round's comments are addressed, LGTM from me. Important behavior to check is - the snapshot, and creating replacement executors based on captured snapshot. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20636 **[Test build #91863 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91863/testReport)** for PR 20636 at commit [`aca2ee6`](https://github.com/apache/spark/commit/aca2ee645f368a74451f5147bb3662786120e0d1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20636 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/151/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20636 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20636 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4042/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20636 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20636 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user dvogelbacher commented on the issue: https://github.com/apache/spark/pull/21366 Agree with @mccheah on not blocking this on a design doc. This PR strictly improves the management of executor states in k8s compared to how it was done before. So we really should get this merged soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user Silberlocke commented on the issue: https://github.com/apache/spark/pull/21366 Agree with @mccheah on not blocking this on a design doc. This PR strictly improves the management of executor states in k8s compared to how it was done before. So we really should get this merged soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21441 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21441 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91851/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21441 **[Test build #91851 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91851/testReport)** for PR 21441 at commit [`57b545e`](https://github.com/apache/spark/commit/57b545e43b45d927c1ce6a9bf31ebba6c7073a92). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21559#discussion_r195516859 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala --- @@ -110,40 +126,61 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkB def clear(): Unit = synchronized { batches.clear() +numRows = 0 + } + + private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = { --- End diff -- Can this go in MemorySinkBase? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21221 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21221 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91846/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21559#discussion_r195516533 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala --- @@ -294,6 +333,16 @@ class MemorySink(val schema: StructType, outputMode: OutputMode) extends Sink def clear(): Unit = synchronized { batches.clear() +numRows = 0 + } + + private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = { --- End diff -- nit: I'd document that maxRows is the remaining row capacity, not the maximum row limit defined in the options --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21221 **[Test build #91846 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91846/testReport)** for PR 21221 at commit [`99044e6`](https://github.com/apache/spark/commit/99044e6ec0cdc1b760c57dd5b7e74349384c6a98). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195513995 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala --- @@ -56,17 +58,44 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) -val allocatorExecutor = ThreadUtils - .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( "kubernetes-executor-requests") + +val bufferSnapshotsExecutor = ThreadUtils + .newDaemonSingleThreadScheduledExecutor("kubernetes-executor-snapshots-buffer") +val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(bufferSnapshotsExecutor) +val removedExecutorsCache = CacheBuilder.newBuilder() + .expireAfterWrite(3, TimeUnit.MINUTES) --- End diff -- The cache is only for a best effort attempt to not remove the same executor from the scheduler backend multiple times, but at the end of the day even if we do accidentally remove multiple times the only noticeable result is noisy logs. The scheduler backend properly handles multiple attempts to remove but we'd prefer it if we didn't have to. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21546 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/150/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21546 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21546 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4041/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21546 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...
Github user mccheah commented on the issue: https://github.com/apache/spark/pull/21366 > @mccheah could you add a design doc for future reference and so that new contributors can understand better the rationale behind this. There is some description in the JIRA ticket but not enough to describe the final solution. I can do that, but would we consider that blocking the merge of this PR? I'd like to get this in soon, it's been open for awhile. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195512808 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit} + +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.util.ThreadUtils + +private[spark] class ExecutorPodsPollingSnapshotSource( +conf: SparkConf, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +pollingExecutor: ScheduledExecutorService) { + + private val pollingInterval = conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL) + + private var pollingFuture: Future[_] = _ + + def start(applicationId: String): Unit = { +require(pollingFuture == null, "Cannot start polling more than once.") +pollingFuture = pollingExecutor.scheduleWithFixedDelay( + new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) + } + + def stop(): Unit = { +if (pollingFuture != null) { + pollingFuture.cancel(true) + pollingFuture = null +} +ThreadUtils.shutdown(pollingExecutor) + } + + private class PollRunnable(applicationId: String) extends Runnable { +override def run(): Unit = { + snapshotsStore.replaceSnapshot(kubernetesClient --- End diff -- Not strictly why that's done here but a side-effect I suppose. Really the snapshots store should push an initial empty snapshot to all subscribers when it starts, and the unit tests do check for that - it's the responsibility of the snapshots store. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195512430 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala --- @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} + +import io.fabric8.kubernetes.api.model.PodBuilder +import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.mutable + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.internal.Logging +import org.apache.spark.util.{Clock, Utils} + +private[spark] class ExecutorPodsAllocator( +conf: SparkConf, +executorBuilder: KubernetesExecutorBuilder, +kubernetesClient: KubernetesClient, +snapshotsStore: ExecutorPodsSnapshotsStore, +clock: Clock) extends Logging { + + private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) + + private val totalExpectedExecutors = new AtomicInteger(0) + + private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) + + private val podAllocationDelay = conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY) + + private val podCreationTimeout = math.max(podAllocationDelay * 5, 6) + + private val kubernetesDriverPodName = conf +.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(throw new SparkException("Must specify the driver pod name")) + + private val driverPod = kubernetesClient.pods() +.withName(kubernetesDriverPodName) +.get() + + // Executor IDs that have been requested from Kubernetes but have not been detected in any + // snapshot yet. Mapped to the timestamp when they were created. + private val newlyCreatedExecutors = mutable.Map.empty[Long, Long] + + def start(applicationId: String): Unit = { +snapshotsStore.addSubscriber(podAllocationDelay) { + onNewSnapshots(applicationId, _) +} + } + + def setTotalExpectedExecutors(total: Int): Unit = totalExpectedExecutors.set(total) + + private def onNewSnapshots(applicationId: String, snapshots: Seq[ExecutorPodsSnapshot]): Unit = { +newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys) +// For all executors we've created against the API but have not seen in a snapshot +// yet - check the current time. If the current time has exceeded some threshold, +// assume that the pod was either never created (the API server never properly +// handled the creation request), or the API server created the pod but we missed +// both the creation and deletion events. In either case, delete the missing pod +// if possible, and mark such a pod to be rescheduled below. +newlyCreatedExecutors.foreach { case (execId, timeCreated) => + if (clock.getTimeMillis() - timeCreated > podCreationTimeout) { +logWarning(s"Executor with id $execId was not detected in the Kubernetes" + + s" cluster after $podCreationTimeout milliseconds despite the fact that a" + + " previous allocation attempt tried to create it. The executor may have been" + + " deleted but the application missed the deletion event.") +Utils.tryLogNonFatalError { + kubernetesClient +.pods() +.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) +.delete() --- End diff -- That's handled by the lifecycle manager already, because the lifecycle manager looks at what the scheduler backend believes are its executors and reconciles them with what's in the snapshot. --- - To unsubscribe,
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/21546 cc @icexelloss --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/21546 Thanks @viirya and @HyukjinKwon ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r195512219 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala --- @@ -56,17 +58,44 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) -val allocatorExecutor = ThreadUtils - .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( "kubernetes-executor-requests") + +val bufferSnapshotsExecutor = ThreadUtils + .newDaemonSingleThreadScheduledExecutor("kubernetes-executor-snapshots-buffer") +val snapshotsStore = new ExecutorPodsSnapshotsStoreImpl(bufferSnapshotsExecutor) +val removedExecutorsCache = CacheBuilder.newBuilder() + .expireAfterWrite(3, TimeUnit.MINUTES) --- End diff -- DOn't think it has to be configurable. Basically we should only receive the removed executor events multiple times for a short period of time, then we should settle into steady state. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r195512218 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -1318,18 +1318,52 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { } } - test("roundtrip payloads") { + test("roundtrip arrow batches") { val inputRows = (0 until 9).map { i => InternalRow(i) } :+ InternalRow(null) val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) val ctx = TaskContext.empty() -val payloadIter = ArrowConverters.toPayloadIterator(inputRows.toIterator, schema, 0, null, ctx) -val outputRowIter = ArrowConverters.fromPayloadIterator(payloadIter, ctx) +val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator, schema, 5, null, ctx) +val outputRowIter = ArrowConverters.fromBatchIterator(batchIter, schema, null, ctx) -assert(schema == outputRowIter.schema) +var count = 0 +outputRowIter.zipWithIndex.foreach { case (row, i) => + if (i != 9) { +assert(row.getInt(0) == i) + } else { +assert(row.isNullAt(0)) + } + count += 1 +} + +assert(count == inputRows.length) + } + + test("ArrowBatchStreamWriter roundtrip") { +val inputRows = (0 until 9).map { i => + InternalRow(i) +} :+ InternalRow(null) + +val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) + +val ctx = TaskContext.empty() +val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator, schema, 5, null, ctx) + +// Write batches to Arrow stream format as a byte array +val out = new ByteArrayOutputStream() --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21546 **[Test build #91862 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91862/testReport)** for PR 21546 at commit [`4af58f9`](https://github.com/apache/spark/commit/4af58f9539ea12c8c309790001efe497d18f0129). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21559 **[Test build #91861 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91861/testReport)** for PR 21559 at commit [`b2ef59c`](https://github.com/apache/spark/commit/b2ef59c40e58cdd6efdb0f5414f16ac5358bc99a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21515: [SPARK-24372][build] Add scripts to help with preparing ...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/21515 FYI I'm playing with building 2.1.3 using the docker-based approach, and there are a few things that I need to adjust before that works. I hope to update this PR later today. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21531: [SPARK-24521][SQL][TEST] Fix ineffective test in CachedT...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21531 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91848/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21531: [SPARK-24521][SQL][TEST] Fix ineffective test in CachedT...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21531 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21531: [SPARK-24521][SQL][TEST] Fix ineffective test in CachedT...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21531 **[Test build #91848 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91848/testReport)** for PR 21531 at commit [`c9db68d`](https://github.com/apache/spark/commit/c9db68d87a6f34f1849aadc3eaf58ed183cc2419). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r195504451 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala --- @@ -34,17 +34,36 @@ private[sql] object PythonSQLUtils { } /** - * Python Callable function to convert ArrowPayloads into a [[DataFrame]]. + * Python callable function to convert an RDD of serialized ArrowRecordBatches into + * a [[DataFrame]]. * - * @param payloadRDD A JavaRDD of ArrowPayloads. - * @param schemaString JSON Formatted Schema for ArrowPayloads. + * @param arrowBatchRDD A JavaRDD of serialized ArrowRecordBatches. + * @param schemaString JSON Formatted Spark schema for Arrow batches. * @param sqlContext The active [[SQLContext]]. * @return The converted [[DataFrame]]. */ - def arrowPayloadToDataFrame( - payloadRDD: JavaRDD[Array[Byte]], + def arrowStreamToDataFrame( --- End diff -- oh right, this is only called by the function below so I suppose we don't even need it.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21068: [SPARK-16630][YARN] Blacklist a node if executors won't ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21068 **[Test build #91860 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91860/testReport)** for PR 21068 at commit [`aa52f6e`](https://github.com/apache/spark/commit/aa52f6edb998d21e51d0d9a73351548034515a8e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21568: [SPARK-24562][TESTS] Support different configs for same ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21568 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r195502588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala --- @@ -34,17 +34,36 @@ private[sql] object PythonSQLUtils { } /** - * Python Callable function to convert ArrowPayloads into a [[DataFrame]]. + * Python callable function to convert an RDD of serialized ArrowRecordBatches into + * a [[DataFrame]]. * - * @param payloadRDD A JavaRDD of ArrowPayloads. - * @param schemaString JSON Formatted Schema for ArrowPayloads. + * @param arrowBatchRDD A JavaRDD of serialized ArrowRecordBatches. + * @param schemaString JSON Formatted Spark schema for Arrow batches. * @param sqlContext The active [[SQLContext]]. * @return The converted [[DataFrame]]. */ - def arrowPayloadToDataFrame( - payloadRDD: JavaRDD[Array[Byte]], + def arrowStreamToDataFrame( + arrowBatchRDD: JavaRDD[Array[Byte]], schemaString: String, sqlContext: SQLContext): DataFrame = { -ArrowConverters.toDataFrame(payloadRDD, schemaString, sqlContext) +ArrowConverters.toDataFrame(arrowBatchRDD, schemaString, sqlContext) + } + + /** + * Python callable function to read a file in Arrow stream format and create a [[DataFrame]] + * using each serialized ArrowRecordBatch as a partition. + * + * @param sqlContext The active [[SQLContext]]. + * @param filename File to read the Arrow stream from. + * @param schemaString JSON Formatted Spark schema for Arrow batches. + * @return A new [[DataFrame]]. + */ + def arrowReadStreamFromFile( + sqlContext: SQLContext, + filename: String, + schemaString: String): DataFrame = { +JavaSparkContext.fromSparkContext(sqlContext.sparkContext) --- End diff -- oops, nothing! I must have forgot to delete, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21568: [SPARK-24562][TESTS] Support different configs for same ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21568 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4040/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21568: [SPARK-24562][TESTS] Support different configs for same ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21568 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21546 cc @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21568: [SPARK-24562][TESTS] Support different configs for same ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21568 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/149/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r195501939 --- Diff: python/pyspark/serializers.py --- @@ -184,24 +184,28 @@ def loads(self, obj): raise NotImplementedError -class ArrowSerializer(FramedSerializer): +class ArrowSerializer(Serializer): --- End diff -- Maybe `ArrowStreamSerializer`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r195501843 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala --- @@ -1318,18 +1318,52 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll { } } - test("roundtrip payloads") { + test("roundtrip arrow batches") { val inputRows = (0 until 9).map { i => InternalRow(i) } :+ InternalRow(null) val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) val ctx = TaskContext.empty() -val payloadIter = ArrowConverters.toPayloadIterator(inputRows.toIterator, schema, 0, null, ctx) -val outputRowIter = ArrowConverters.fromPayloadIterator(payloadIter, ctx) +val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator, schema, 5, null, ctx) +val outputRowIter = ArrowConverters.fromBatchIterator(batchIter, schema, null, ctx) -assert(schema == outputRowIter.schema) +var count = 0 +outputRowIter.zipWithIndex.foreach { case (row, i) => + if (i != 9) { +assert(row.getInt(0) == i) + } else { +assert(row.isNullAt(0)) + } + count += 1 +} + +assert(count == inputRows.length) + } + + test("ArrowBatchStreamWriter roundtrip") { +val inputRows = (0 until 9).map { i => + InternalRow(i) +} :+ InternalRow(null) + +val schema = StructType(Seq(StructField("int", IntegerType, nullable = true))) + +val ctx = TaskContext.empty() +val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator, schema, 5, null, ctx) + +// Write batches to Arrow stream format as a byte array +val out = new ByteArrayOutputStream() --- End diff -- This doesn't actually need to be closed, but I should be closing the DataOutputStream, so I'll put that in tryWithResource --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21529: [SPARK-24495][SQL] EnsureRequirement returns wrong plan ...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21529 Adding new queries to `SQLQueryTestSuite` is the best way to do it in the current infrastructure. Do your best to cover all the join algorithms for different input data and join types? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21568: [SPARK-24562][TESTS] Support different configs for same ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21568 **[Test build #91859 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91859/testReport)** for PR 21568 at commit [`ed01ff0`](https://github.com/apache/spark/commit/ed01ff0d40fbe65b3a239b196a90013119ad3580). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21568: [SPARK-24562][TESTS] Support different configs for same ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/21568 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21568: [SPARK-24562][TESTS] Support different configs fo...
GitHub user mgaido91 opened a pull request: https://github.com/apache/spark/pull/21568 [SPARK-24562][TESTS] Support different configs for same test in SQLQueryTestSuite ## What changes were proposed in this pull request? The PR proposes to add support for running the same SQL test input files against different configs, leading either to the same result or to a different one. ## How was this patch tested? Involved UTs You can merge this pull request into a Git repository by running: $ git pull https://github.com/mgaido91/spark SPARK-24562 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21568.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21568 commit ed01ff0d40fbe65b3a239b196a90013119ad3580 Author: Marco Gaido Date: 2018-06-13T15:57:17Z Different config for same test in SQLQueryTestSuite --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r195499089 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala --- @@ -34,17 +34,36 @@ private[sql] object PythonSQLUtils { } /** - * Python Callable function to convert ArrowPayloads into a [[DataFrame]]. + * Python callable function to convert an RDD of serialized ArrowRecordBatches into + * a [[DataFrame]]. * - * @param payloadRDD A JavaRDD of ArrowPayloads. - * @param schemaString JSON Formatted Schema for ArrowPayloads. + * @param arrowBatchRDD A JavaRDD of serialized ArrowRecordBatches. + * @param schemaString JSON Formatted Spark schema for Arrow batches. * @param sqlContext The active [[SQLContext]]. * @return The converted [[DataFrame]]. */ - def arrowPayloadToDataFrame( - payloadRDD: JavaRDD[Array[Byte]], + def arrowStreamToDataFrame( --- End diff -- it's public so it can be called in Python with Py4j --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r195498764 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3236,13 +3236,49 @@ class Dataset[T] private[sql]( } /** - * Collect a Dataset as ArrowPayload byte arrays and serve to PySpark. + * Collect a Dataset as Arrow batches and serve stream to PySpark. */ private[sql] def collectAsArrowToPython(): Array[Any] = { +val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone + withAction("collectAsArrowToPython", queryExecution) { plan => - val iter: Iterator[Array[Byte]] = -toArrowPayload(plan).collect().iterator.map(_.asPythonSerializable) - PythonRDD.serveIterator(iter, "serve-Arrow") + PythonRDD.serveToStream("serve-Arrow") { out => +val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) +val arrowBatchRdd = getArrowBatchRdd(plan) +val numPartitions = arrowBatchRdd.partitions.length + +// Store collection results for worst case of 1 to N-1 partitions +val results = new Array[Array[Array[Byte]]](numPartitions - 1) +var lastIndex = -1 // index of last partition written + +// Handler to eagerly write partitions to Python in order +def handlePartitionBatches(index: Int, arrowBatches: Array[Array[Byte]]): Unit = { + // If result is from next partition in order + if (index - 1 == lastIndex) { +batchWriter.writeBatches(arrowBatches.iterator) +lastIndex += 1 +// Write stored partitions that come next in order +while (lastIndex < results.length && results(lastIndex) != null) { + batchWriter.writeBatches(results(lastIndex).iterator) + results(lastIndex) = null + lastIndex += 1 +} +// After last batch, end the stream +if (lastIndex == results.length) { + batchWriter.end() +} + } else { +// Store partitions received out of order +results(index - 1) = arrowBatches + } +} + +sparkSession.sparkContext.runJob( + arrowBatchRdd, + (ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray, + 0 until numPartitions, + handlePartitionBatches) --- End diff -- > +1 chunking if we could. I recall Bryan said for grouped UDF we need the entire set. This still keeps Arrow record batches chunked within each partition, which can help the executor memory, but doesn't do anything for the driver side because we still need to collect the entire partition in the driver JVM. > Also not sure if python side we have any assumption on how much of the partition is in each chunk (there shouldn't be?) No, Python doesn't care how many chunks the data is in, it's handled by pyarrow --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20611 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20611 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91852/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20611 **[Test build #91852 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91852/testReport)** for PR 20611 at commit [`47dec45`](https://github.com/apache/spark/commit/47dec4570bd892cd2bd78455d7e46d3a95a88be3). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21546#discussion_r195497043 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3236,13 +3236,49 @@ class Dataset[T] private[sql]( } /** - * Collect a Dataset as ArrowPayload byte arrays and serve to PySpark. + * Collect a Dataset as Arrow batches and serve stream to PySpark. */ private[sql] def collectAsArrowToPython(): Array[Any] = { +val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone + withAction("collectAsArrowToPython", queryExecution) { plan => - val iter: Iterator[Array[Byte]] = -toArrowPayload(plan).collect().iterator.map(_.asPythonSerializable) - PythonRDD.serveIterator(iter, "serve-Arrow") + PythonRDD.serveToStream("serve-Arrow") { out => +val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) +val arrowBatchRdd = getArrowBatchRdd(plan) +val numPartitions = arrowBatchRdd.partitions.length + +// Store collection results for worst case of 1 to N-1 partitions +val results = new Array[Array[Array[Byte]]](numPartitions - 1) +var lastIndex = -1 // index of last partition written + +// Handler to eagerly write partitions to Python in order +def handlePartitionBatches(index: Int, arrowBatches: Array[Array[Byte]]): Unit = { + // If result is from next partition in order + if (index - 1 == lastIndex) { +batchWriter.writeBatches(arrowBatches.iterator) +lastIndex += 1 +// Write stored partitions that come next in order +while (lastIndex < results.length && results(lastIndex) != null) { + batchWriter.writeBatches(results(lastIndex).iterator) + results(lastIndex) = null + lastIndex += 1 +} +// After last batch, end the stream +if (lastIndex == results.length) { + batchWriter.end() +} + } else { +// Store partitions received out of order +results(index - 1) = arrowBatches + } +} + +sparkSession.sparkContext.runJob( + arrowBatchRdd, + (ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray, + 0 until numPartitions, + handlePartitionBatches) --- End diff -- > is it better to incrementally run job on partitions in order I believe this is how `toLocalIterator` works right? I tried using that because it does only keep 1 partition in memory at a time, but the performance took quite a hit from the multiple jobs. I think we should still prioritize performance over memory for `toPandas()` since it's assumed the data to be collect should be relatively small. I did have another idea though, we could stream all partitions to Python out of order, then follow with another small batch of data that contains maps of partitionIndex to orderReceived. Then the partitions could be put into order on the Python side before making the Pandas DataFrame. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21550: [SPARK-24543][SQL] Support any type as DDL string for fr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21550 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91844/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21550: [SPARK-24543][SQL] Support any type as DDL string for fr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21550 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21529: [SPARK-24495][SQL] EnsureRequirement returns wrong plan ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/21529 Thanks @gatorsmile. Sorry, may I ask what you think about https://github.com/apache/spark/pull/21529#issuecomment-396707622? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21550: [SPARK-24543][SQL] Support any type as DDL string for fr...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21550 **[Test build #91844 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91844/testReport)** for PR 21550 at commit [`af946b8`](https://github.com/apache/spark/commit/af946b8ada5af91428e7ab44478e920308846a59). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21558: [SPARK-24552][SQL] Use task ID instead of attempt number...
Github user tgravescs commented on the issue: https://github.com/apache/spark/pull/21558 sorry trying to catch up on this thread. Are we saying this is a bug in the existing output committer as well when we have a fetch failure? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21476: [SPARK-24446][yarn] Properly quote library path for YARN...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21476 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21476: [SPARK-24446][yarn] Properly quote library path for YARN...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21476 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4039/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21529: [SPARK-24495][SQL] EnsureRequirement returns wron...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21529 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21529: [SPARK-24495][SQL] EnsureRequirement returns wrong plan ...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21529 Thanks! Merged to master/2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20636 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20636 **[Test build #91858 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91858/testReport)** for PR 20636 at commit [`aca2ee6`](https://github.com/apache/spark/commit/aca2ee645f368a74451f5147bb3662786120e0d1). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20636 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91858/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21529: [SPARK-24495][SQL] EnsureRequirement returns wrong plan ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21529 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21529: [SPARK-24495][SQL] EnsureRequirement returns wrong plan ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21529 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91843/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21529: [SPARK-24495][SQL] EnsureRequirement returns wrong plan ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21529 **[Test build #91843 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91843/testReport)** for PR 21529 at commit [`6ef4f0d`](https://github.com/apache/spark/commit/6ef4f0df7590f0da5aa900f29292ec0fe94658fb). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20636 **[Test build #91858 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91858/testReport)** for PR 20636 at commit [`aca2ee6`](https://github.com/apache/spark/commit/aca2ee645f368a74451f5147bb3662786120e0d1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20636 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21288: [SPARK-24206][SQL] Improve FilterPushdownBenchmark bench...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21288 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4037/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20636 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4038/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org