Github user mccheah commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21366#discussion_r194184310
  
    --- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
 ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.scheduler.cluster.k8s
    +
    +import java.util.concurrent.{ExecutorService, ScheduledExecutorService, 
TimeUnit}
    +
    +import com.google.common.collect.Lists
    +import io.fabric8.kubernetes.api.model.Pod
    +import io.reactivex.disposables.Disposable
    +import io.reactivex.functions.Consumer
    +import io.reactivex.schedulers.Schedulers
    +import io.reactivex.subjects.PublishSubject
    +import javax.annotation.concurrent.GuardedBy
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +import org.apache.spark.util.{ThreadUtils, Utils}
    +
    +private[spark] class ExecutorPodsSnapshotsStoreImpl(
    +    bufferSnapshotsExecutor: ScheduledExecutorService,
    +    executeSubscriptionsExecutor: ExecutorService)
    +  extends ExecutorPodsSnapshotsStore {
    +
    +  private val SNAPSHOT_LOCK = new Object()
    +
    +  private val snapshotsObservable = 
PublishSubject.create[ExecutorPodsSnapshot]()
    +  private val observedDisposables = mutable.Buffer.empty[Disposable]
    +
    +  @GuardedBy("SNAPSHOT_LOCK")
    +  private var currentSnapshot = ExecutorPodsSnapshot()
    +
    +  override def addSubscriber(
    +      processBatchIntervalMillis: Long)
    +      (subscriber: ExecutorPodsSnapshot => Unit): Unit = {
    +    observedDisposables += snapshotsObservable
    +      // Group events in the time window given by the caller. These 
buffers are then sent
    +      // to the caller's lambda at the given interval, with the pod 
updates that occurred
    +      // in that given interval.
    +      .buffer(
    +        processBatchIntervalMillis,
    +        TimeUnit.MILLISECONDS,
    +        // For testing - specifically use the given scheduled executor 
service to trigger
    +        // buffer boundaries. Allows us to inject a deterministic 
scheduler here.
    +        Schedulers.from(bufferSnapshotsExecutor))
    +      // Trigger an event cycle immediately. Not strictly required to be 
fully correct, but
    +      // in particular the pod allocator should try to request executors 
immediately instead
    +      // of waiting for one pod allocation delay.
    +      .startWith(Lists.newArrayList(ExecutorPodsSnapshot()))
    +      // Force all triggered events - both the initial event above and the 
buffered ones in
    +      // the following time windows - to execute asynchronously to this 
call's thread.
    +      .observeOn(Schedulers.from(executeSubscriptionsExecutor))
    +      .subscribe(toReactivexConsumer { snapshots: 
java.util.List[ExecutorPodsSnapshot] =>
    +        Utils.tryLogNonFatalError {
    +          snapshots.asScala.foreach(subscriber)
    +        }
    +      })
    +  }
    +
    +  override def stop(): Unit = {
    +    observedDisposables.foreach(_.dispose())
    +    snapshotsObservable.onComplete()
    +    ThreadUtils.shutdown(bufferSnapshotsExecutor)
    +    ThreadUtils.shutdown(executeSubscriptionsExecutor)
    +  }
    +
    +  override def updatePod(updatedPod: Pod): Unit = 
SNAPSHOT_LOCK.synchronized {
    +    currentSnapshot = currentSnapshot.withUpdate(updatedPod)
    --- End diff --
    
    A brief remark that one still might not be 100% accurate because of 
failures in the information flow (missing events etc) - but for all the events 
we do get we should be trying to handle them all as best we can.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to