[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21564
  
**[Test build #91856 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91856/testReport)**
 for PR 21564 at commit 
[`405ba94`](https://github.com/apache/spark/commit/405ba9441973a186569bbf733907bd9445331c34).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195542619
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
+  }
+
+  private class PollRunnable(applicationId: String) extends Runnable {
+override def run(): Unit = {
+  snapshotsStore.replaceSnapshot(kubernetesClient
--- End diff --

I see - I think what we actually want is `ExecutorPodsSnapshotStoreImpl` to 
initialize the subscriber with its current snapshot. That creates the semantics 
where the new subscriber will first receive the most up to date state 
immediately.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195542491
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -304,6 +305,11 @@ class SparkContext(config: SparkConf) extends Logging {
 _dagScheduler = ds
   }
 
+  private[spark] def heartbeater: Heartbeater = _heartbeater
+  private[spark] def heartbeater_=(hb: Heartbeater): Unit = {
--- End diff --

These aren't used -- I'll remove.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195542018
  
--- Diff: core/src/main/scala/org/apache/spark/status/api/v1/api.scala ---
@@ -98,14 +101,53 @@ class ExecutorSummary private[spark](
 val removeReason: Option[String],
 val executorLogs: Map[String, String],
 val memoryMetrics: Option[MemoryMetrics],
-val blacklistedInStages: Set[Int])
+val blacklistedInStages: Set[Int],
+@JsonSerialize(using = classOf[PeakMemoryMetricsSerializer])
+@JsonDeserialize(using = classOf[PeakMemoryMetricsDeserializer])
+val peakMemoryMetrics: Option[Array[Long]])
 
 class MemoryMetrics private[spark](
 val usedOnHeapStorageMemory: Long,
 val usedOffHeapStorageMemory: Long,
 val totalOnHeapStorageMemory: Long,
 val totalOffHeapStorageMemory: Long)
 
+/** deserialzer for peakMemoryMetrics: convert to array ordered by metric 
name */
+class PeakMemoryMetricsDeserializer extends 
JsonDeserializer[Option[Array[Long]]] {
+  override def deserialize(
+  jsonParser: JsonParser,
+  deserializationContext: DeserializationContext): Option[Array[Long]] 
= {
+val metricsMap = jsonParser.readValueAs(classOf[Option[Map[String, 
Object]]])
+metricsMap match {
+  case Some(metrics) =>
+Some(MetricGetter.values.map { m =>
+  metrics.getOrElse (m.name, 0L) match {
+case intVal: Int => intVal.toLong
+case longVal: Long => longVal
+  }
+}.toArray)
+  case None => None
+}
+  }
+}
+
+/** serializer for peakMemoryMetrics: convert array to map with metric 
name as key */
+class PeakMemoryMetricsSerializer extends 
JsonSerializer[Option[Array[Long]]] {
+  override def serialize(
+  metrics: Option[Array[Long]],
+  jsonGenerator: JsonGenerator,
+  serializerProvider: SerializerProvider): Unit = {
+metrics match {
+  case Some(m) =>
+val metricsMap = (0 until MetricGetter.values.length).map { idx =>
--- End diff --

It's still being used in JsonProtocol.executorMetricsToJson -- let me know 
if you'd like me to convert that to use values instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195541136
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -234,8 +272,18 @@ private[spark] class EventLoggingListener(
 }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = { }
+  override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
+if (shouldLogExecutorMetricsUpdates) {
+  // For the active stages, record any new peak values for the memory 
metrics for the executor
+  event.executorUpdates.foreach { executorUpdates =>
+liveStageExecutorMetrics.values.foreach { peakExecutorMetrics =>
+  val peakMetrics = peakExecutorMetrics.getOrElseUpdate(
+event.execId, new PeakExecutorMetrics())
+  peakMetrics.compareAndUpdate(executorUpdates)
--- End diff --

What would be the right timestamp? Peaks for different metrics could have 
different timestamps.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21564
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21564
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91855/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21564: [SPARK-24556][SQL] ReusedExchange should rewrite output ...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21564
  
**[Test build #91855 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91855/testReport)**
 for PR 21564 at commit 
[`0ef99cc`](https://github.com/apache/spark/commit/0ef99cc972a54fd9c98338e54a7e4e6b9a213654).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21569: [SPARK-24563][PYTHON]Catch TypeError when testing existe...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21569
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/152/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21569: [SPARK-24563][PYTHON]Catch TypeError when testing existe...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21569
  
**[Test build #91865 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91865/testReport)**
 for PR 21569 at commit 
[`fdc3e75`](https://github.com/apache/spark/commit/fdc3e759d5d936018fe79a799065476ab0b73ec9).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21569: [SPARK-24563][PYTHON]Catch TypeError when testing existe...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21569
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21569: [SPARK-24563][PYTHON]Catch TypeError when testing existe...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21569
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4043/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21569: [SPARK-24563][PYTHON]Catch TypeError when testing existe...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21569
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195539837
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
--- End diff --

Tracking task start and end would be some amount of overhead. If it's a 
relatively unlikely corner case, and unlikely to have much impact on the 
numbers, it may be better to leave as is. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195538848
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
+if (shouldLogExecutorMetricsUpdates) {
+  // clear out any previous attempts, that did not have a stage 
completed event
+  val prevAttemptId = event.stageInfo.attemptNumber() - 1
+  for (attemptId <- 0 to prevAttemptId) {
+liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
+  }
+
+  // log the peak executor metrics for the stage, for each live 
executor,
+  // whether or not the executor is running tasks for the stage
+  val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
+  val executorMap = liveStageExecutorMetrics.remove(
+(event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+  executorMap.foreach {
+   executorEntry => {
+  for ((executorId, peakExecutorMetrics) <- executorEntry) {
+val executorMetrics = new ExecutorMetrics(-1, 
peakExecutorMetrics.metrics)
--- End diff --

We need to pass in a value for timestamp, but there isn't really one for 
the peak metrics, since times for each peak could be different. 

When processing, -1 will help indicate that the event is coming from the 
history log, and contains the peak values for the stage that is just ending. 
When updating the stage executor peaks (peak executor values stored for each 
active stage), we can replace all of the peak executor metric values instead of 
updating with the max of current and new values for each metric. 

As an example, suppose there is the following scenario:
T1: start of stage 1
T2: peak value of 1000 for metric m1
T3: start of stage 2
T4: stage 1 ends, and peak metric values for stage 1 are logged, including 
m1=1000
T5: stage 2 ends, and peak metric values for stage 2 are logged.

If values for m1 are < 1000 between T3 (start of stage 2) and T5 (end of 
stage 2), and say that the highest value for m1 during that period is 500, then 
we want the peak value for m1 for stage 2 to show as 500.

There would be an ExecutorMetricsUpdate event logged (and then read) at T4 
(end of stage 1), with m1=1000, which is after T3 (start of stage 2). If when 
reading the history log, we set the stage 2 peakExecutorMetrics to the max of 
current or new values from ExecutorMetricsUpdate, then the value for stage 2 
would remain at 1000. However, we want it to be replaced by the value of 500 
when it gets the ExecutorMetricsUpdate logged at T5 (end of stage 2). During 
processing of ExecutorMetricsUpdate, for the stage level metrics, it will 
replace all the peakExecutorMetrics if timestamp is -1.

I can add some comments for this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21569: Catch TypeError when testing existence of HiveCon...

2018-06-14 Thread icexelloss
GitHub user icexelloss opened a pull request:

https://github.com/apache/spark/pull/21569

Catch TypeError when testing existence of HiveConf when creating pysp…

…ark shell

## What changes were proposed in this pull request?

This PR catches TypeError when testing existence of HiveConf when creating 
pyspark shell

## How was this patch tested?

Manually tested. Here are the manual test cases:

Build with hive:
```
(pyarrow-dev) Lis-MacBook-Pro:spark icexelloss$ bin/pyspark
Python 3.6.5 | packaged by conda-forge | (default, Apr  6 2018, 13:44:09) 
[GCC 4.2.1 Compatible Apple LLVM 6.1.0 (clang-602.0.53)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
18/06/14 14:55:41 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0-SNAPSHOT
  /_/

Using Python version 3.6.5 (default, Apr  6 2018 13:44:09)
SparkSession available as 'spark'.
>>> spark.conf.get('spark.sql.catalogImplementation')
'hive'
```

Build without hive:
```
(pyarrow-dev) Lis-MacBook-Pro:spark icexelloss$ bin/pyspark
Python 3.6.5 | packaged by conda-forge | (default, Apr  6 2018, 13:44:09) 
[GCC 4.2.1 Compatible Apple LLVM 6.1.0 (clang-602.0.53)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
18/06/14 15:04:52 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0-SNAPSHOT
  /_/

Using Python version 3.6.5 (default, Apr  6 2018 13:44:09)
SparkSession available as 'spark'.
>>> spark.conf.get('spark.sql.catalogImplementation')
'in-memory'
```

Failed to start shell:
```
(pyarrow-dev) Lis-MacBook-Pro:spark icexelloss$ bin/pyspark
Python 3.6.5 | packaged by conda-forge | (default, Apr  6 2018, 13:44:09) 
[GCC 4.2.1 Compatible Apple LLVM 6.1.0 (clang-602.0.53)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
18/06/14 15:07:53 WARN NativeCodeLoader: Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
/Users/icexelloss/workspace/spark/python/pyspark/shell.py:45: UserWarning: 
Failed to initialize Spark session.
  warnings.warn("Failed to initialize Spark session.")
Traceback (most recent call last):
  File "/Users/icexelloss/workspace/spark/python/pyspark/shell.py", line 
41, in 
spark = SparkSession._create_shell_session()
  File "/Users/icexelloss/workspace/spark/python/pyspark/sql/session.py", 
line 581, in _create_shell_session
return SparkSession.builder.getOrCreate()
  File "/Users/icexelloss/workspace/spark/python/pyspark/sql/session.py", 
line 168, in getOrCreate
raise py4j.protocol.Py4JError("Fake Py4JError")
py4j.protocol.Py4JError: Fake Py4JError
```



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/icexelloss/spark 
SPARK-24563-fix-pyspark-shell-without-hive

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21569.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21569


commit 7d5b62858cd3fecbf149b0890d11b23c06356011
Author: Li Jin 
Date:   2018-06-14T18:52:20Z

Catch TypeError when testing existence of HiveConf when creating pyspark 
shell




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21527: [SPARK-24519] MapStatus has 2000 hardcoded

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21527
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91850/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21527: [SPARK-24519] MapStatus has 2000 hardcoded

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21527
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21527: [SPARK-24519] MapStatus has 2000 hardcoded

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21527
  
**[Test build #91850 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91850/testReport)**
 for PR 21527 at commit 
[`4c8acfa`](https://github.com/apache/spark/commit/4c8acfa5899ccbdafeb630f38ce44b23332b80f2).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195535296
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
+  }
+
+  private class PollRunnable(applicationId: String) extends Runnable {
+override def run(): Unit = {
+  snapshotsStore.replaceSnapshot(kubernetesClient
--- End diff --

Yes you need to trigger the initial creation of executors somehow and yes I 
saw that in the tests, my only concern is that this should be explicit not 
implicit to make code more obvious anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21559
  
**[Test build #91864 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91864/testReport)**
 for PR 21559 at commit 
[`25d6de1`](https://github.com/apache/spark/commit/25d6de1db8223975ebd9b69c7ca77c26e3d8674c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195534314
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -93,6 +96,9 @@ private[spark] class EventLoggingListener(
   // Visible for tests only.
   private[scheduler] val logPath = getLogPath(logBaseDir, appId, 
appAttemptId, compressionCodecName)
 
+  // map of live stages, to peak executor metrics for the stage
+  private val liveStageExecutorMetrics = HashMap[(Int, Int), 
HashMap[String, PeakExecutorMetrics]]()
--- End diff --

modified.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21221: [SPARK-23429][CORE] Add executor memory metrics t...

2018-06-14 Thread edwinalu
Github user edwinalu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21221#discussion_r195533972
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1751,7 +1753,7 @@ class DAGScheduler(
 messageScheduler.shutdownNow()
 eventProcessLoop.stop()
 taskScheduler.stop()
-  }
+   }
--- End diff --

fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20636
  
**[Test build #91863 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91863/testReport)**
 for PR 20636 at commit 
[`aca2ee6`](https://github.com/apache/spark/commit/aca2ee645f368a74451f5147bb3662786120e0d1).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20636
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91863/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20636
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...

2018-06-14 Thread foxish
Github user foxish commented on the issue:

https://github.com/apache/spark/pull/21366
  
If last round's comments are addressed, LGTM from me. Important behavior to 
check is -  the snapshot, and creating replacement executors based on captured 
snapshot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20636
  
**[Test build #91863 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91863/testReport)**
 for PR 20636 at commit 
[`aca2ee6`](https://github.com/apache/spark/commit/aca2ee645f368a74451f5147bb3662786120e0d1).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20636
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/151/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20636
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20636
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4042/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20636
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20636
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...

2018-06-14 Thread dvogelbacher
Github user dvogelbacher commented on the issue:

https://github.com/apache/spark/pull/21366
  
Agree with @mccheah on not blocking this on a design doc. This PR strictly 
improves the management of executor states in k8s compared to how it was done 
before. So we really should get this merged soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...

2018-06-14 Thread Silberlocke
Github user Silberlocke commented on the issue:

https://github.com/apache/spark/pull/21366
  
Agree with @mccheah on not blocking this on a design doc. This PR strictly 
improves the management of executor states in k8s compared to how it was done 
before. So we really should get this merged soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21441
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21441
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91851/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21441: [DO-NOT-MERGE] Run tests against hadoop-3.1 to see the t...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21441
  
**[Test build #91851 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91851/testReport)**
 for PR 21441 at commit 
[`57b545e`](https://github.com/apache/spark/commit/57b545e43b45d927c1ce6a9bf31ebba6c7073a92).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

2018-06-14 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21559#discussion_r195516859
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
 ---
@@ -110,40 +126,61 @@ class MemorySinkV2 extends DataSourceV2 with 
StreamWriteSupport with MemorySinkB
 
   def clear(): Unit = synchronized {
 batches.clear()
+numRows = 0
+  }
+
+  private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, 
batchId: Long): Array[Row] = {
--- End diff --

Can this go in MemorySinkBase?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21221
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21221
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91846/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21559: [SPARK-24525][SS] Provide an option to limit numb...

2018-06-14 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/21559#discussion_r195516533
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
---
@@ -294,6 +333,16 @@ class MemorySink(val schema: StructType, outputMode: 
OutputMode) extends Sink
 
   def clear(): Unit = synchronized {
 batches.clear()
+numRows = 0
+  }
+
+  private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, 
batchId: Long): Array[Row] = {
--- End diff --

nit: I'd document that maxRows is the remaining row capacity, not the 
maximum row limit defined in the options


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21221: [SPARK-23429][CORE] Add executor memory metrics to heart...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21221
  
**[Test build #91846 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91846/testReport)**
 for PR 21221 at commit 
[`99044e6`](https://github.com/apache/spark/commit/99044e6ec0cdc1b760c57dd5b7e74349384c6a98).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195513995
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 ---
@@ -56,17 +58,44 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
   Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
   Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
 
-val allocatorExecutor = ThreadUtils
-  .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
 val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
   "kubernetes-executor-requests")
+
+val bufferSnapshotsExecutor = ThreadUtils
+  
.newDaemonSingleThreadScheduledExecutor("kubernetes-executor-snapshots-buffer")
+val snapshotsStore = new 
ExecutorPodsSnapshotsStoreImpl(bufferSnapshotsExecutor)
+val removedExecutorsCache = CacheBuilder.newBuilder()
+  .expireAfterWrite(3, TimeUnit.MINUTES)
--- End diff --

The cache is only for a best effort attempt to not remove the same executor 
from the scheduler backend multiple times, but at the end of the day even if we 
do accidentally remove multiple times the only noticeable result is noisy logs. 
The scheduler backend properly handles multiple attempts to remove but we'd 
prefer it if we didn't have to.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21546
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/150/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21546
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21546
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4041/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21546
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21366: [SPARK-24248][K8S] Use level triggering and state reconc...

2018-06-14 Thread mccheah
Github user mccheah commented on the issue:

https://github.com/apache/spark/pull/21366
  
> @mccheah could you add a design doc for future reference and so that new 
contributors can understand better the rationale behind this. There is some 
description in the JIRA ticket but not enough to describe the final solution.

I can do that, but would we consider that blocking the merge of this PR? 
I'd like to get this in soon, it's been open for awhile.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195512808
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ThreadUtils
+
+private[spark] class ExecutorPodsPollingSnapshotSource(
+conf: SparkConf,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+pollingExecutor: ScheduledExecutorService) {
+
+  private val pollingInterval = 
conf.get(KUBERNETES_EXECUTOR_API_POLLING_INTERVAL)
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
+}
+ThreadUtils.shutdown(pollingExecutor)
+  }
+
+  private class PollRunnable(applicationId: String) extends Runnable {
+override def run(): Unit = {
+  snapshotsStore.replaceSnapshot(kubernetesClient
--- End diff --

Not strictly why that's done here but a side-effect I suppose. Really the 
snapshots store should push an initial empty snapshot to all subscribers when 
it starts, and the unit tests do check for that - it's the responsibility of 
the snapshots store.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195512430
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 ---
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.PodBuilder
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.{Clock, Utils}
+
+private[spark] class ExecutorPodsAllocator(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+snapshotsStore: ExecutorPodsSnapshotsStore,
+clock: Clock) extends Logging {
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val podCreationTimeout = math.max(podAllocationDelay * 5, 6)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Executor IDs that have been requested from Kubernetes but have not 
been detected in any
+  // snapshot yet. Mapped to the timestamp when they were created.
+  private val newlyCreatedExecutors = mutable.Map.empty[Long, Long]
+
+  def start(applicationId: String): Unit = {
+snapshotsStore.addSubscriber(podAllocationDelay) {
+  onNewSnapshots(applicationId, _)
+}
+  }
+
+  def setTotalExpectedExecutors(total: Int): Unit = 
totalExpectedExecutors.set(total)
+
+  private def onNewSnapshots(applicationId: String, snapshots: 
Seq[ExecutorPodsSnapshot]): Unit = {
+newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
+// For all executors we've created against the API but have not seen 
in a snapshot
+// yet - check the current time. If the current time has exceeded some 
threshold,
+// assume that the pod was either never created (the API server never 
properly
+// handled the creation request), or the API server created the pod 
but we missed
+// both the creation and deletion events. In either case, delete the 
missing pod
+// if possible, and mark such a pod to be rescheduled below.
+newlyCreatedExecutors.foreach { case (execId, timeCreated) =>
+  if (clock.getTimeMillis() - timeCreated > podCreationTimeout) {
+logWarning(s"Executor with id $execId was not detected in the 
Kubernetes" +
+  s" cluster after $podCreationTimeout milliseconds despite the 
fact that a" +
+  " previous allocation attempt tried to create it. The executor 
may have been" +
+  " deleted but the application missed the deletion event.")
+Utils.tryLogNonFatalError {
+  kubernetesClient
+.pods()
+.withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString)
+.delete()
--- End diff --

That's handled by the lifecycle manager already, because the lifecycle 
manager looks at what the scheduler backend believes are its executors and 
reconciles them with what's in the snapshot.


---

-
To unsubscribe, 

[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...

2018-06-14 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/21546
  
cc @icexelloss 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...

2018-06-14 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/21546
  
Thanks @viirya and @HyukjinKwon !


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use level triggering and state...

2018-06-14 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r195512219
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 ---
@@ -56,17 +58,44 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
   Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)),
   Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)))
 
-val allocatorExecutor = ThreadUtils
-  .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator")
 val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool(
   "kubernetes-executor-requests")
+
+val bufferSnapshotsExecutor = ThreadUtils
+  
.newDaemonSingleThreadScheduledExecutor("kubernetes-executor-snapshots-buffer")
+val snapshotsStore = new 
ExecutorPodsSnapshotsStoreImpl(bufferSnapshotsExecutor)
+val removedExecutorsCache = CacheBuilder.newBuilder()
+  .expireAfterWrite(3, TimeUnit.MINUTES)
--- End diff --

DOn't think it has to be configurable. Basically we should only receive the 
removed executor events multiple times for a short period of time, then we 
should settle into steady state.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...

2018-06-14 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r195512218
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 ---
@@ -1318,18 +1318,52 @@ class ArrowConvertersSuite extends SharedSQLContext 
with BeforeAndAfterAll {
 }
   }
 
-  test("roundtrip payloads") {
+  test("roundtrip arrow batches") {
 val inputRows = (0 until 9).map { i =>
   InternalRow(i)
 } :+ InternalRow(null)
 
 val schema = StructType(Seq(StructField("int", IntegerType, nullable = 
true)))
 
 val ctx = TaskContext.empty()
-val payloadIter = 
ArrowConverters.toPayloadIterator(inputRows.toIterator, schema, 0, null, ctx)
-val outputRowIter = ArrowConverters.fromPayloadIterator(payloadIter, 
ctx)
+val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator, 
schema, 5, null, ctx)
+val outputRowIter = ArrowConverters.fromBatchIterator(batchIter, 
schema, null, ctx)
 
-assert(schema == outputRowIter.schema)
+var count = 0
+outputRowIter.zipWithIndex.foreach { case (row, i) =>
+  if (i != 9) {
+assert(row.getInt(0) == i)
+  } else {
+assert(row.isNullAt(0))
+  }
+  count += 1
+}
+
+assert(count == inputRows.length)
+  }
+
+  test("ArrowBatchStreamWriter roundtrip") {
+val inputRows = (0 until 9).map { i =>
+  InternalRow(i)
+} :+ InternalRow(null)
+
+val schema = StructType(Seq(StructField("int", IntegerType, nullable = 
true)))
+
+val ctx = TaskContext.empty()
+val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator, 
schema, 5, null, ctx)
+
+// Write batches to Arrow stream format as a byte array
+val out = new ByteArrayOutputStream()
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21546
  
**[Test build #91862 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91862/testReport)**
 for PR 21546 at commit 
[`4af58f9`](https://github.com/apache/spark/commit/4af58f9539ea12c8c309790001efe497d18f0129).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21559: [SPARK-24525][SS] Provide an option to limit number of r...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21559
  
**[Test build #91861 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91861/testReport)**
 for PR 21559 at commit 
[`b2ef59c`](https://github.com/apache/spark/commit/b2ef59c40e58cdd6efdb0f5414f16ac5358bc99a).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21515: [SPARK-24372][build] Add scripts to help with preparing ...

2018-06-14 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/21515
  
FYI I'm playing with building 2.1.3 using the docker-based approach, and 
there are a few things that I need to adjust before that works. I hope to 
update this PR later today.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21531: [SPARK-24521][SQL][TEST] Fix ineffective test in CachedT...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21531
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91848/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21531: [SPARK-24521][SQL][TEST] Fix ineffective test in CachedT...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21531
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21531: [SPARK-24521][SQL][TEST] Fix ineffective test in CachedT...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21531
  
**[Test build #91848 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91848/testReport)**
 for PR 21531 at commit 
[`c9db68d`](https://github.com/apache/spark/commit/c9db68d87a6f34f1849aadc3eaf58ed183cc2419).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...

2018-06-14 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r195504451
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala ---
@@ -34,17 +34,36 @@ private[sql] object PythonSQLUtils {
   }
 
   /**
-   * Python Callable function to convert ArrowPayloads into a 
[[DataFrame]].
+   * Python callable function to convert an RDD of serialized 
ArrowRecordBatches into
+   * a [[DataFrame]].
*
-   * @param payloadRDD A JavaRDD of ArrowPayloads.
-   * @param schemaString JSON Formatted Schema for ArrowPayloads.
+   * @param arrowBatchRDD A JavaRDD of serialized ArrowRecordBatches.
+   * @param schemaString JSON Formatted Spark schema for Arrow batches.
* @param sqlContext The active [[SQLContext]].
* @return The converted [[DataFrame]].
*/
-  def arrowPayloadToDataFrame(
-  payloadRDD: JavaRDD[Array[Byte]],
+  def arrowStreamToDataFrame(
--- End diff --

oh right, this is only called by the function below so I suppose we don't 
even need it..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21068: [SPARK-16630][YARN] Blacklist a node if executors won't ...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21068
  
**[Test build #91860 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91860/testReport)**
 for PR 21068 at commit 
[`aa52f6e`](https://github.com/apache/spark/commit/aa52f6edb998d21e51d0d9a73351548034515a8e).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21568: [SPARK-24562][TESTS] Support different configs for same ...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21568
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...

2018-06-14 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r195502588
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala ---
@@ -34,17 +34,36 @@ private[sql] object PythonSQLUtils {
   }
 
   /**
-   * Python Callable function to convert ArrowPayloads into a 
[[DataFrame]].
+   * Python callable function to convert an RDD of serialized 
ArrowRecordBatches into
+   * a [[DataFrame]].
*
-   * @param payloadRDD A JavaRDD of ArrowPayloads.
-   * @param schemaString JSON Formatted Schema for ArrowPayloads.
+   * @param arrowBatchRDD A JavaRDD of serialized ArrowRecordBatches.
+   * @param schemaString JSON Formatted Spark schema for Arrow batches.
* @param sqlContext The active [[SQLContext]].
* @return The converted [[DataFrame]].
*/
-  def arrowPayloadToDataFrame(
-  payloadRDD: JavaRDD[Array[Byte]],
+  def arrowStreamToDataFrame(
+  arrowBatchRDD: JavaRDD[Array[Byte]],
   schemaString: String,
   sqlContext: SQLContext): DataFrame = {
-ArrowConverters.toDataFrame(payloadRDD, schemaString, sqlContext)
+ArrowConverters.toDataFrame(arrowBatchRDD, schemaString, sqlContext)
+  }
+
+  /**
+   * Python callable function to read a file in Arrow stream format and 
create a [[DataFrame]]
+   * using each serialized ArrowRecordBatch as a partition.
+   *
+   * @param sqlContext The active [[SQLContext]].
+   * @param filename File to read the Arrow stream from.
+   * @param schemaString JSON Formatted Spark schema for Arrow batches.
+   * @return A new [[DataFrame]].
+   */
+  def arrowReadStreamFromFile(
+  sqlContext: SQLContext,
+  filename: String,
+  schemaString: String): DataFrame = {
+JavaSparkContext.fromSparkContext(sqlContext.sparkContext)
--- End diff --

oops, nothing!  I must have forgot to delete, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21568: [SPARK-24562][TESTS] Support different configs for same ...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21568
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4040/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21568: [SPARK-24562][TESTS] Support different configs for same ...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21568
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream format ...

2018-06-14 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21546
  
cc @ueshin 



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21568: [SPARK-24562][TESTS] Support different configs for same ...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21568
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/149/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...

2018-06-14 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r195501939
  
--- Diff: python/pyspark/serializers.py ---
@@ -184,24 +184,28 @@ def loads(self, obj):
 raise NotImplementedError
 
 
-class ArrowSerializer(FramedSerializer):
+class ArrowSerializer(Serializer):
--- End diff --

Maybe `ArrowStreamSerializer`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...

2018-06-14 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r195501843
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
 ---
@@ -1318,18 +1318,52 @@ class ArrowConvertersSuite extends SharedSQLContext 
with BeforeAndAfterAll {
 }
   }
 
-  test("roundtrip payloads") {
+  test("roundtrip arrow batches") {
 val inputRows = (0 until 9).map { i =>
   InternalRow(i)
 } :+ InternalRow(null)
 
 val schema = StructType(Seq(StructField("int", IntegerType, nullable = 
true)))
 
 val ctx = TaskContext.empty()
-val payloadIter = 
ArrowConverters.toPayloadIterator(inputRows.toIterator, schema, 0, null, ctx)
-val outputRowIter = ArrowConverters.fromPayloadIterator(payloadIter, 
ctx)
+val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator, 
schema, 5, null, ctx)
+val outputRowIter = ArrowConverters.fromBatchIterator(batchIter, 
schema, null, ctx)
 
-assert(schema == outputRowIter.schema)
+var count = 0
+outputRowIter.zipWithIndex.foreach { case (row, i) =>
+  if (i != 9) {
+assert(row.getInt(0) == i)
+  } else {
+assert(row.isNullAt(0))
+  }
+  count += 1
+}
+
+assert(count == inputRows.length)
+  }
+
+  test("ArrowBatchStreamWriter roundtrip") {
+val inputRows = (0 until 9).map { i =>
+  InternalRow(i)
+} :+ InternalRow(null)
+
+val schema = StructType(Seq(StructField("int", IntegerType, nullable = 
true)))
+
+val ctx = TaskContext.empty()
+val batchIter = ArrowConverters.toBatchIterator(inputRows.toIterator, 
schema, 5, null, ctx)
+
+// Write batches to Arrow stream format as a byte array
+val out = new ByteArrayOutputStream()
--- End diff --

This doesn't actually need to be closed, but I should be closing the 
DataOutputStream, so I'll put that in tryWithResource


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21529: [SPARK-24495][SQL] EnsureRequirement returns wrong plan ...

2018-06-14 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21529
  
Adding new queries to `SQLQueryTestSuite` is the best way to do it in the 
current infrastructure. Do your best to cover all the join algorithms for 
different input data and join types? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21568: [SPARK-24562][TESTS] Support different configs for same ...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21568
  
**[Test build #91859 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91859/testReport)**
 for PR 21568 at commit 
[`ed01ff0`](https://github.com/apache/spark/commit/ed01ff0d40fbe65b3a239b196a90013119ad3580).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21568: [SPARK-24562][TESTS] Support different configs for same ...

2018-06-14 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/spark/pull/21568
  
cc @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21568: [SPARK-24562][TESTS] Support different configs fo...

2018-06-14 Thread mgaido91
GitHub user mgaido91 opened a pull request:

https://github.com/apache/spark/pull/21568

[SPARK-24562][TESTS] Support different configs for same test in 
SQLQueryTestSuite

## What changes were proposed in this pull request?

The PR proposes to add support for running the same SQL test input files 
against different configs, leading either to the same result or to a different 
one.

## How was this patch tested?

Involved UTs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mgaido91/spark SPARK-24562

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21568.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21568


commit ed01ff0d40fbe65b3a239b196a90013119ad3580
Author: Marco Gaido 
Date:   2018-06-13T15:57:17Z

Different config for same test in SQLQueryTestSuite




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...

2018-06-14 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r195499089
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala ---
@@ -34,17 +34,36 @@ private[sql] object PythonSQLUtils {
   }
 
   /**
-   * Python Callable function to convert ArrowPayloads into a 
[[DataFrame]].
+   * Python callable function to convert an RDD of serialized 
ArrowRecordBatches into
+   * a [[DataFrame]].
*
-   * @param payloadRDD A JavaRDD of ArrowPayloads.
-   * @param schemaString JSON Formatted Schema for ArrowPayloads.
+   * @param arrowBatchRDD A JavaRDD of serialized ArrowRecordBatches.
+   * @param schemaString JSON Formatted Spark schema for Arrow batches.
* @param sqlContext The active [[SQLContext]].
* @return The converted [[DataFrame]].
*/
-  def arrowPayloadToDataFrame(
-  payloadRDD: JavaRDD[Array[Byte]],
+  def arrowStreamToDataFrame(
--- End diff --

it's public so it can be called in Python with Py4j


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...

2018-06-14 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r195498764
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3236,13 +3236,49 @@ class Dataset[T] private[sql](
   }
 
   /**
-   * Collect a Dataset as ArrowPayload byte arrays and serve to PySpark.
+   * Collect a Dataset as Arrow batches and serve stream to PySpark.
*/
   private[sql] def collectAsArrowToPython(): Array[Any] = {
+val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
+
 withAction("collectAsArrowToPython", queryExecution) { plan =>
-  val iter: Iterator[Array[Byte]] =
-toArrowPayload(plan).collect().iterator.map(_.asPythonSerializable)
-  PythonRDD.serveIterator(iter, "serve-Arrow")
+  PythonRDD.serveToStream("serve-Arrow") { out =>
+val batchWriter = new ArrowBatchStreamWriter(schema, out, 
timeZoneId)
+val arrowBatchRdd = getArrowBatchRdd(plan)
+val numPartitions = arrowBatchRdd.partitions.length
+
+// Store collection results for worst case of 1 to N-1 partitions
+val results = new Array[Array[Array[Byte]]](numPartitions - 1)
+var lastIndex = -1  // index of last partition written
+
+// Handler to eagerly write partitions to Python in order
+def handlePartitionBatches(index: Int, arrowBatches: 
Array[Array[Byte]]): Unit = {
+  // If result is from next partition in order
+  if (index - 1 == lastIndex) {
+batchWriter.writeBatches(arrowBatches.iterator)
+lastIndex += 1
+// Write stored partitions that come next in order
+while (lastIndex < results.length && results(lastIndex) != 
null) {
+  batchWriter.writeBatches(results(lastIndex).iterator)
+  results(lastIndex) = null
+  lastIndex += 1
+}
+// After last batch, end the stream
+if (lastIndex == results.length) {
+  batchWriter.end()
+}
+  } else {
+// Store partitions received out of order
+results(index - 1) = arrowBatches
+  }
+}
+
+sparkSession.sparkContext.runJob(
+  arrowBatchRdd,
+  (ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray,
+  0 until numPartitions,
+  handlePartitionBatches)
--- End diff --

> +1 chunking if we could. I recall Bryan said for grouped UDF we need the 
entire set.

This still keeps Arrow record batches chunked within each partition, which 
can help the executor memory, but doesn't do anything for the driver side 
because we still need to collect the entire partition in the driver JVM.

> Also not sure if python side we have any assumption on how much of the 
partition is in each chunk (there shouldn't be?)
No, Python doesn't care how many chunks the data is in, it's handled by 
pyarrow


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20611
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20611
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91852/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20611: [SPARK-23425][SQL]Support wildcard in HDFS path for load...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20611
  
**[Test build #91852 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91852/testReport)**
 for PR 20611 at commit 
[`47dec45`](https://github.com/apache/spark/commit/47dec4570bd892cd2bd78455d7e46d3a95a88be3).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21546: [WIP][SPARK-23030][SQL][PYTHON] Use Arrow stream ...

2018-06-14 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21546#discussion_r195497043
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3236,13 +3236,49 @@ class Dataset[T] private[sql](
   }
 
   /**
-   * Collect a Dataset as ArrowPayload byte arrays and serve to PySpark.
+   * Collect a Dataset as Arrow batches and serve stream to PySpark.
*/
   private[sql] def collectAsArrowToPython(): Array[Any] = {
+val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
+
 withAction("collectAsArrowToPython", queryExecution) { plan =>
-  val iter: Iterator[Array[Byte]] =
-toArrowPayload(plan).collect().iterator.map(_.asPythonSerializable)
-  PythonRDD.serveIterator(iter, "serve-Arrow")
+  PythonRDD.serveToStream("serve-Arrow") { out =>
+val batchWriter = new ArrowBatchStreamWriter(schema, out, 
timeZoneId)
+val arrowBatchRdd = getArrowBatchRdd(plan)
+val numPartitions = arrowBatchRdd.partitions.length
+
+// Store collection results for worst case of 1 to N-1 partitions
+val results = new Array[Array[Array[Byte]]](numPartitions - 1)
+var lastIndex = -1  // index of last partition written
+
+// Handler to eagerly write partitions to Python in order
+def handlePartitionBatches(index: Int, arrowBatches: 
Array[Array[Byte]]): Unit = {
+  // If result is from next partition in order
+  if (index - 1 == lastIndex) {
+batchWriter.writeBatches(arrowBatches.iterator)
+lastIndex += 1
+// Write stored partitions that come next in order
+while (lastIndex < results.length && results(lastIndex) != 
null) {
+  batchWriter.writeBatches(results(lastIndex).iterator)
+  results(lastIndex) = null
+  lastIndex += 1
+}
+// After last batch, end the stream
+if (lastIndex == results.length) {
+  batchWriter.end()
+}
+  } else {
+// Store partitions received out of order
+results(index - 1) = arrowBatches
+  }
+}
+
+sparkSession.sparkContext.runJob(
+  arrowBatchRdd,
+  (ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray,
+  0 until numPartitions,
+  handlePartitionBatches)
--- End diff --

> is it better to incrementally run job on partitions in order
I believe this is how `toLocalIterator` works right?  I tried using that 
because it does only keep 1 partition in memory at a time, but the performance 
took quite a hit from the multiple jobs.  I think we should still prioritize 
performance over memory for `toPandas()` since it's assumed the data to be 
collect should be relatively small.

I did have another idea though, we could stream all partitions to Python 
out of order, then follow with another small batch of data that contains maps 
of partitionIndex to orderReceived.  Then the partitions could be put into 
order on the Python side before making the Pandas DataFrame.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21550: [SPARK-24543][SQL] Support any type as DDL string for fr...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21550
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91844/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21550: [SPARK-24543][SQL] Support any type as DDL string for fr...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21550
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21529: [SPARK-24495][SQL] EnsureRequirement returns wrong plan ...

2018-06-14 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/spark/pull/21529
  
Thanks @gatorsmile. Sorry, may I ask what you think about 
https://github.com/apache/spark/pull/21529#issuecomment-396707622? Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21550: [SPARK-24543][SQL] Support any type as DDL string for fr...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21550
  
**[Test build #91844 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91844/testReport)**
 for PR 21550 at commit 
[`af946b8`](https://github.com/apache/spark/commit/af946b8ada5af91428e7ab44478e920308846a59).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21558: [SPARK-24552][SQL] Use task ID instead of attempt number...

2018-06-14 Thread tgravescs
Github user tgravescs commented on the issue:

https://github.com/apache/spark/pull/21558
  
sorry trying to catch up on this thread.  Are we saying this is a bug in 
the existing output committer as well when we have a fetch failure?   


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21476: [SPARK-24446][yarn] Properly quote library path for YARN...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21476
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21476: [SPARK-24446][yarn] Properly quote library path for YARN...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21476
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4039/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21529: [SPARK-24495][SQL] EnsureRequirement returns wron...

2018-06-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21529


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21529: [SPARK-24495][SQL] EnsureRequirement returns wrong plan ...

2018-06-14 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/21529
  
Thanks! Merged to master/2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20636
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20636
  
**[Test build #91858 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91858/testReport)**
 for PR 20636 at commit 
[`aca2ee6`](https://github.com/apache/spark/commit/aca2ee645f368a74451f5147bb3662786120e0d1).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20636
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91858/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21529: [SPARK-24495][SQL] EnsureRequirement returns wrong plan ...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21529
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21529: [SPARK-24495][SQL] EnsureRequirement returns wrong plan ...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21529
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91843/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21529: [SPARK-24495][SQL] EnsureRequirement returns wrong plan ...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/21529
  
**[Test build #91843 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91843/testReport)**
 for PR 21529 at commit 
[`6ef4f0d`](https://github.com/apache/spark/commit/6ef4f0df7590f0da5aa900f29292ec0fe94658fb).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20636
  
**[Test build #91858 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91858/testReport)**
 for PR 20636 at commit 
[`aca2ee6`](https://github.com/apache/spark/commit/aca2ee645f368a74451f5147bb3662786120e0d1).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20636
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21288: [SPARK-24206][SQL] Improve FilterPushdownBenchmark bench...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21288
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4037/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20636
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/4038/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   >