[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-24 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190769478
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingEventSource.scala
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.deploy.k8s.Constants._
+
+private[spark] class ExecutorPodsPollingEventSource(
+kubernetesClient: KubernetesClient,
+eventHandler: ExecutorPodsEventHandler,
+pollingExecutor: ScheduledExecutorService) {
+
+  private var pollingFuture: Future[_] = null
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), 0L, 30L, TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
--- End diff --

Done, see below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-24 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190762965
  
--- Diff: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsEventQueue.scala
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+import scala.collection.mutable
+
+class DeterministicExecutorPodsEventQueue extends ExecutorPodsEventQueue {
+
+  private val eventBuffer = mutable.Buffer.empty[Pod]
+  private val subscribers = mutable.Buffer.empty[(Seq[Pod]) => Unit]
+
+  override def addSubscriber
+  (processBatchIntervalMillis: Long)
+  (onNextBatch: (Seq[Pod]) => Unit): Unit = {
+subscribers += onNextBatch
+  }
+
+  override def stopProcessingEvents(): Unit = {}
+
+  override def pushPodUpdate(updatedPod: Pod): Unit = eventBuffer += 
updatedPod
--- End diff --

Yup, basically just a live stream of the pod statuses as reported by the 
API.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-24 Thread erikerlandson
Github user erikerlandson commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190755750
  
--- Diff: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsEventQueue.scala
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+import scala.collection.mutable
+
+class DeterministicExecutorPodsEventQueue extends ExecutorPodsEventQueue {
+
+  private val eventBuffer = mutable.Buffer.empty[Pod]
+  private val subscribers = mutable.Buffer.empty[(Seq[Pod]) => Unit]
+
+  override def addSubscriber
+  (processBatchIntervalMillis: Long)
+  (onNextBatch: (Seq[Pod]) => Unit): Unit = {
+subscribers += onNextBatch
+  }
+
+  override def stopProcessingEvents(): Unit = {}
+
+  override def pushPodUpdate(updatedPod: Pod): Unit = eventBuffer += 
updatedPod
--- End diff --

So events are pods themselves, as opposed to some event structure on pods?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190398282
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+   

[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190389782
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+

[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190386369
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+

[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190385827
  
--- Diff: pom.xml ---
@@ -760,6 +760,12 @@
 1.10.19
 test
   
+  
--- End diff --

We always add to the top level and then in the lower level poms, we 
reference the dependent modules without listing their versions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190372105
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+   

[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190367677
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+

[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190367420
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+

[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190366267
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchEventSource.scala
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsWatchEventSource(
+eventHandler: ExecutorPodsEventHandler,
+kubernetesClient: KubernetesClient) extends Logging {
+
+  private var watchConnection: Closeable = null
--- End diff --

In general `start` and `stop` actions happen on their own threads; i.e. 
there shouldn't be concurrent threads trying to `start` and `stop` any 
component at the same time. So I think it's fine to make all data structures 
that are only accessed in `start` and `stop` not thread safe.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190365981
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+

[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190365332
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+

[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190356134
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+   

[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190356590
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingEventSource.scala
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.deploy.k8s.Constants._
+
+private[spark] class ExecutorPodsPollingEventSource(
+kubernetesClient: KubernetesClient,
+eventHandler: ExecutorPodsEventHandler,
+pollingExecutor: ScheduledExecutorService) {
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), 0L, 30L, TimeUnit.SECONDS)
--- End diff --

Agreed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190354568
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+   

[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190356761
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingEventSource.scala
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.deploy.k8s.Constants._
+
+private[spark] class ExecutorPodsPollingEventSource(
+kubernetesClient: KubernetesClient,
+eventHandler: ExecutorPodsEventHandler,
+pollingExecutor: ScheduledExecutorService) {
+
+  private var pollingFuture: Future[_] = null
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), 0L, 30L, TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+if (pollingFuture != null) {
+  pollingFuture.cancel(true)
+  pollingFuture = null
--- End diff --

Ditto. the `pollingExecutor` should be shutdown properly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190350967
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, podAllocationDelay, TimeUnit.MILLISECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+   

[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190350563
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) extends Logging {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val podAllocationDelay = 
conf.get(KUBERNETES_ALLOCATION_BATCH_DELAY)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = mutable.Set.empty[Long]
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes an executor is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = mutable.Set.empty[Long]
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+logInfo(s"Starting Kubernetes executor pods event handler for 
application with" +
+  s" id $applicationId.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = {
+Utils.tryLogNonFatalError {
+  processEvents(applicationId, schedulerBackend)
+}
+  }
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
--- End diff --

The `eventProcessorExecutor` should be shutdown properly. Or you can use 
https://google.github.io/guava/releases/17.0/api/docs/com/google/common/util/concurrent/MoreExecutors.html#getExitingScheduledExecutorService(java.util.concurrent.ScheduledThreadPoolExecutor)
 to avoid having to manually shutdown it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For 

[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190358300
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchEventSource.scala
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsWatchEventSource(
+eventHandler: ExecutorPodsEventHandler,
+kubernetesClient: KubernetesClient) extends Logging {
+
+  private var watchConnection: Closeable = null
--- End diff --

Ditto for `pollingFuture` above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190358018
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchEventSource.scala
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Closeable
+
+import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsWatchEventSource(
+eventHandler: ExecutorPodsEventHandler,
+kubernetesClient: KubernetesClient) extends Logging {
+
+  private var watchConnection: Closeable = null
--- End diff --

Is there a equivalent to Java's `volatile` in Scala? If so, 
`watchConnection` should be `volatile` so your `watchConnection == null` check 
below is safe.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-23 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r190349318
  
--- Diff: pom.xml ---
@@ -760,6 +760,12 @@
 1.10.19
 test
   
+  
--- End diff --

Why adding this to the top level pom?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-21 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r189739169
  
--- Diff: pom.xml ---
@@ -150,6 +150,7 @@
 
 4.5.4
 4.4.8
+3.0.1
--- End diff --

Noted, will remove in the next push.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-21 Thread erikerlandson
Github user erikerlandson commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r189735073
  
--- Diff: pom.xml ---
@@ -150,6 +150,7 @@
 
 4.5.4
 4.4.8
+3.0.1
--- End diff --

My take is that the performance is probably not worth the additional 
dependency.
I also noticed that the trove dep is LGPL, which is considered incompatible 
with Apache license. Although I believe this is not a show-stopper with respect 
to "containerized" dependencies, it probably is for a direct dependency in the 
code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-21 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r189642632
  
--- Diff: pom.xml ---
@@ -150,6 +150,7 @@
 
 4.5.4
 4.4.8
+3.0.1
--- End diff --

These are data structures optimized for storing primitives. We could use 
standard Scala here functionally speaking.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-19 Thread erikerlandson
Github user erikerlandson commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r189435162
  
--- Diff: pom.xml ---
@@ -150,6 +150,7 @@
 
 4.5.4
 4.4.8
+3.0.1
--- End diff --

it looks like this dep is being taken on for a couple data structures: are 
these important or could they be replaced with scala Array and HashMap?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-18 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r189401135
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingEventSource.scala
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.deploy.k8s.Constants._
+
+private[spark] class ExecutorPodsPollingEventSource(
+kubernetesClient: KubernetesClient,
+eventHandler: ExecutorPodsEventHandler,
+pollingExecutor: ScheduledExecutorService) {
+
+  private var pollingFuture: Future[_] = _
+
+  def start(applicationId: String): Unit = {
+require(pollingFuture == null, "Cannot start polling more than once.")
+pollingFuture = pollingExecutor.scheduleWithFixedDelay(
+  new PollRunnable(applicationId), 0L, 30L, TimeUnit.SECONDS)
--- End diff --

Should make these and other intervals like it configurable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-18 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r189400912
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingEventSource.scala
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, ScheduledExecutorService, TimeUnit}
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+
+import org.apache.spark.deploy.k8s.Constants._
+
+private[spark] class ExecutorPodsPollingEventSource(
--- End diff --

It's noteworthy that the resync polls can also be done in 
`ExecutorPodsEventHandler#processEvents`. The reason we don't is because we 
probably want the resync polls to occur on a different interval than the event 
handling passes. You may, for example, ask for the event handler to trigger 
very frequently so that pod updates are dealt with promptly. But you don't want 
to be polling the API server every 5 seconds.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-18 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r189400286
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import gnu.trove.list.array.TLongArrayList
+import gnu.trove.set.hash.TLongHashSet
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = new TLongHashSet()
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes a scheduler is running is dictated by the K8s API rather 
than Spark's RPC events.
+  // We may need to consider where these perspectives may differ and which 
perspective should
+  // take precedence.
+  private val runningExecutors = new TLongHashSet()
+
+  private var eventProcessorFuture: Future[_] = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+require(eventProcessorFuture == null, "Cannot start event processing 
twice.")
+val eventProcessor = new Runnable {
+  override def run(): Unit = processEvents(applicationId, 
schedulerBackend)
+}
+eventProcessorFuture = eventProcessorExecutor.scheduleWithFixedDelay(
+  eventProcessor, 0L, 5L, TimeUnit.SECONDS)
+  }
+
+  def stop(): Unit = {
+if (eventProcessorFuture != null) {
+  eventProcessorFuture.cancel(true)
+  eventProcessorFuture = null
+}
+  }
+
+  private def processEvents(
+  applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend) {
+val currentEvents = new 
java.util.ArrayList[Seq[Pod]](eventQueue.size())
+  eventQueue.size())
+eventQueue.drainTo(currentEvents)
+currentEvents.asScala.flatten.foreach { updatedPod =>
+  val execId = 
updatedPod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong
+  val podPhase = updatedPod.getStatus.getPhase.toLowerCase
+  if 

[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-18 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/21366#discussion_r189399432
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsEventHandler.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.{Future, LinkedBlockingQueue, 
ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
+
+import gnu.trove.list.array.TLongArrayList
+import gnu.trove.set.hash.TLongHashSet
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.scheduler.ExecutorExited
+import org.apache.spark.util.Utils
+
+private[spark] class ExecutorPodsEventHandler(
+conf: SparkConf,
+executorBuilder: KubernetesExecutorBuilder,
+kubernetesClient: KubernetesClient,
+eventProcessorExecutor: ScheduledExecutorService) {
+
+  import ExecutorPodsEventHandler._
+
+  private val EXECUTOR_ID_COUNTER = new AtomicLong(0L)
+
+  private val totalExpectedExecutors = new AtomicInteger(0)
+
+  private val eventQueue = new LinkedBlockingQueue[Seq[Pod]]()
+
+  private val podAllocationSize = 
conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE)
+
+  private val kubernetesDriverPodName = conf
+.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(throw new SparkException("Must specify the driver pod 
name"))
+
+  private val driverPod = kubernetesClient.pods()
+.withName(kubernetesDriverPodName)
+.get()
+
+  // Use sets of ids instead of counters to be able to handle duplicate 
events.
+
+  // Executor IDs that have been requested from Kubernetes but are not 
running yet.
+  private val pendingExecutors = new TLongHashSet()
+
+  // We could use CoarseGrainedSchedulerBackend#totalRegisteredExecutors 
here for tallying the
+  // executors that are running. But, here we choose instead to maintain 
all state within this
+  // class from the persecptive of the k8s API. Therefore whether or not 
this scheduler loop
+  // believes a scheduler is running is dictated by the K8s API rather 
than Spark's RPC events.
--- End diff --

believes an executor is running*


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21366: [SPARK-24248][K8S][WIP] Use the Kubernetes API to...

2018-05-18 Thread mccheah
GitHub user mccheah opened a pull request:

https://github.com/apache/spark/pull/21366

[SPARK-24248][K8S][WIP] Use the Kubernetes API to populate an event queue 
for scheduling

## What changes were proposed in this pull request?

Previously, the scheduler backend was maintaining state in many places, not 
only for reading state but also writing to it. For example, state had to be 
managed in both the watch and in the executor allocator runnable. Furthermore, 
one had to keep track of multiple hash tables.

We can do better here by:

1. Consolidating the places where we manage state. Here, we take 
inspiration from traditional Kubernetes controllers. These controllers tend to 
implement an event queue which is populated by two sources: a watch connection, 
and a periodic poller. Controllers typically use both mechanisms for 
redundancy; the watch connection may drop, so the periodic polling serves as a 
backup. Both sources write pod updates to a single event queue and then a 
processor periodically processes the current state of pods as reported by the 
two sources.

2. Storing less specialized in-memory state in general. Previously we were 
creating hash tables to represent the state of executors. Instead, it's easier 
to represent state solely by the event queue, which has predictable read/write 
patterns and is more or less just a local up-to-date cache of the cluster's 
status.

## How was this patch tested?

Integration tests should test there's no regressions end to end. Unit tests 
to be updated, in particular focusing on different orderings of events, 
particularly accounting for when events come in unexpected ordering.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/palantir/spark event-queue-driven-scheduling

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21366.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21366


commit 310263c253a8c4a3748cab5b5a7698e076695cd6
Author: mcheah 
Date:   2018-05-18T20:39:47Z

[SPARK-24248][K8S] Use the Kubernetes API to populate an event queue for 
scheduling

Previously, the scheduler backend was maintaining state in many places,
not only for reading state but also writing to it. For example, state
had to be managed in both the watch and in the executor allocator
runnable. Furthermore one had to keep track of multiple hash tables.

We can do better here by:

(1) Consolidating the places where we manage state. Here, we take
inspiration from traditional Kubernetes controllers. These controllers
tend to implement an event queue which is populated by two sources: a
watch connection, and a periodic poller. Controllers typically use both
mechanisms for redundancy; the watch connection may drop, so the
periodic polling serves as a backup. Both sources write pod updates to a
single event queue and then a processor periodically processes the
current state of pods as reported by the two sources.

(2) Storing less specialized in-memory state in general. Previously we
were creating hash tables to represent the state of executors. Instead,
it's easier to represent state solely by the event queue, which has
predictable read/write patterns and is more or less just a local
up-to-date cache of the cluster's status.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org